Commit efc2cb1b authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1

parent 0be01079
......@@ -18,6 +18,11 @@ DRIVER_MEMORY = "4g"
EXECUTOR_MEMORY = "4g"
MYSQL_JAR_PATH = "jars/mysql-connector-java-8.0.30.jar"
# EXCLUDE VALIDATION FIELD
EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID"
# REDONDEO DE DECIMALES
ROUND_DECIMAL = 2
class MatchAndExcludeRecordsAction(ActionInterface):
......@@ -31,7 +36,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.exclude_pivot = None
self.pivot_params = None
self.ctp_params = None
self.tmp_df = None
self.output = None
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]):
......@@ -89,9 +94,13 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
for column in pivot_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
pivot_df = pivot_df.withColumnRenamed(column, "PIVOT_"+column)
for column in ctp_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
ctp_df = ctp_df.withColumnRenamed(column, "COUNTERPART_"+column)
for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()):
......@@ -102,8 +111,19 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round, udf, array_except
from pyspark.sql.types import StringType, ArrayType, IntegerType
from pyspark.sql.functions import sum, collect_list, round, udf, array_except, size, col, array
from pyspark.sql.types import ArrayType, IntegerType, LongType
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"])
ctp_cols = self.ctp_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
timeout = self.timeout
max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
......@@ -111,63 +131,68 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
ctp_df_2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), 2).alias(self.ctp_params["amount-column"]),
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
ctp_df_2.show()
pivot_df2 = pivot_df.dropDuplicates(subset=pivot_cols)
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).\
agg(round(sum(self.pivot_params["amount-column"]), 2).alias(self.pivot_params["amount-column"]),
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
pivot_df2.show()
ctp_df2 = ctp_df.dropDuplicates(subset=ctp_cols)
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]). \
agg(round(sum(self.pivot_params["amount-column"]), 2).alias(self.pivot_params["amount-column"]),
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df_2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), 2).alias(self.ctp_params["amount-column"]),
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"])
ctp_cols = self.ctp_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
condition = [pivot_df2[col1] == ctp_df_2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = pivot_df2.join(ctp_df_2, condition)
merged = merged.withColumn("DIFF", pivot_df2[self.pivot_params["amount-column"]] - ctp_df_2[self.ctp_params["amount-column"]])
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], 2))
if self.exclude_pivot:
df_pandas = pivot_df.toPandas()
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
df_pandas = ctp_df.toPandas()
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
timeout = self.timeout
max_combinations = self.max_combinations
custom_udf = udf(lambda diff: custom_apply(diff, df_pandas, group_cols, amount_col, id_col, timeout,
max_combinations), ArrayType(IntegerType()))
merged_df = merged_df.withColumn("LISTA_DIFF", custom_udf(merged_df["DIFF"]))
merged_df = merged_df.withColumn("LIST_PIVOT_ID", array_except(merged_df[self.pivot_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df = merged_df.withColumn("LIST_COUNTERPART_ID", array_except(merged_df[self.ctp_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df.show()
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = pivot_df2.join(ctp_df2, condition)
merged = merged.withColumn("DIFF", pivot_df2[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]])
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot:
df_pandas = pivot_df.toPandas()
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
df_pandas = ctp_df.toPandas()
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
custom_udf = udf(lambda diff: custom_apply(diff, df_pandas, group_cols, amount_col, id_col, timeout,
max_combinations), ArrayType(IntegerType()))
merged_df = merged_df.withColumn("LISTA_DIFF", custom_udf(merged_df["DIFF"]))
merged_df = merged_df.filter((col("DIFF") == 0) | ((col("DIFF") != 0) & (size(col("LISTA_DIFF")) > 0)))
if self.exclude_pivot:
merged_df = merged_df.withColumn("INTER_PIVOT_ID", array_except(merged_df[self.pivot_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df = merged_df.withColumnRenamed(self.ctp_params["id-column"], "INTER_CTP_ID")
if merged_df.schema["INTER_CTP_ID"].dataType == LongType():
merged_df = merged_df.withColumn("INTER_CTP_ID", array(col("INTER_CTP_ID")).cast(ArrayType(LongType())))
else:
merged_df = merged_df.withColumn("INTER_CTP_ID", array_except(merged_df[self.ctp_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df = merged_df.withColumnRenamed(self.pivot_params["id-column"], "INTER_PIVOT_ID")
if merged_df.schema["INTER_PIVOT_ID"].dataType == LongType():
merged_df = merged_df.withColumn("INTER_PIVOT_ID", array(col("INTER_PIVOT_ID")).cast(ArrayType(LongType())))
self.output = merged_df
def response(self):
print("Llegue al response3")
self.output.show()
return self.output
def createSession(self, name: str = "app_engine_spark"):
try:
......@@ -187,17 +212,21 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def custom_apply(raw, df_pandas, group_cols, amount_col, id_col, timeout, max_combinations):
total_cols = group_cols.copy()
total_cols.append(amount_col)
total_cols.append(id_col)
result = df_pandas[total_cols].groupby(group_cols).apply(custom_func, diff=raw, amount_field=amount_col, id_field=id_col,
timeout=timeout, max_combinations=max_combinations).values
resp = ",".join(map(str, result)) if len(result) > 0 else ""
print(resp)
return [int(elem) for elem in resp[1:-1].split(",")]
if raw == 0:
return []
else:
total_cols = group_cols.copy()
total_cols.append(amount_col)
total_cols.append(id_col)
total_cols.append(EXCLUDE_ROWS_FIELD)
result = df_pandas[total_cols].groupby(group_cols).apply(custom_func, diff=raw, amount_field=amount_col, id_field=id_col,
timeout=timeout, max_combinations=max_combinations).values
resp = ",".join(map(str, result)) if len(result) > 0 else ""
return [int(elem) for elem in resp[1:-1].split(",")] if resp != "" else []
def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True)
values = group[amount_field].values
indexs = group.index.values
......@@ -214,11 +243,13 @@ def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
for comb in combinations(indexs, i):
if stop_event: # Verificar si se ha establecido la señal de detención
break
if np.sum(values[list(comb)]).round(2) == diff:
final = group[group.index.isin(comb)][id_field].tolist()
stop_event = True # Establecer la señal de detención
break
if np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff:
# Validamos si la combinación son de registros validados 'EXCLUDE_VALID'
final = group[group.index.isin(comb)]
if (final[EXCLUDE_ROWS_FIELD] == 'S').all():
final = final[id_field].tolist()
stop_event = True # Establecer la señal de detención
break
return None
......@@ -229,7 +260,7 @@ def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
for future in futures:
try:
future.result(timeout=timeout) # Esperar hasta 60 segundos por cada tarea
future.result(timeout=timeout)
except TimeoutError:
stop_event = True # Establecer la señal de detención si se alcanza el tiempo límite
break
......@@ -239,7 +270,6 @@ def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
if time.time() - start_time >= timeout:
return None # Devolver None si se alcanzó el tiempo límite
return final
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment