Commit 8d0caf60 authored by Cristian Aguirre's avatar Cristian Aguirre

Update match-and-exclude-records-actions_v1.py

parent a0f0a123
......@@ -12,11 +12,13 @@ from app.main.engine.action.ActionInterface import ActionInterface
# RELACION DE IDENTIFICADOR DE ACCION Y NOMBRE DE CLASE
relation_classname_identifier = {
"match-and-exclude-records-actions": "MatchAndExcludeRecordsAction"
"match-and-exclude-record-actions": "MatchAndExcludeRecordsAction"
}
# EXCLUDE VALIDATION FIELD
EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID"
TAKE_ROWS_WITH = "Y"
# REDONDEO DE DECIMALES
ROUND_DECIMAL = 2
......@@ -32,10 +34,10 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.pivot_params = None
self.ctp_params = None
self.output = None
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
self.config_params = ["max-record-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]):
self.app.logger.info(f"Descriptor recogido: {descriptor}")
# Validación de parámetros de entrada
entity_config_params = ["tablename", "id-column", "amount-column", "columns-group", "columns-transaction"]
......@@ -60,7 +62,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.identifier = descriptor["idProcess"]
self.max_combinations = configs["max-records-per-combinations"]
self.max_combinations = configs["max-record-per-combinations"]
self.timeout = int(configs["max-timeout-per-combinations"]) // 1000 # Miliseconds
self.exclude_pivot = configs["exclude-entity-pivot"]
self.pivot_params = pivot_params
......@@ -81,10 +83,10 @@ class MatchAndExcludeRecordsAction(ActionInterface):
[self.ctp_params["amount-column"], self.ctp_params["id-column"], EXCLUDE_ROWS_FIELD]
total_ctp_cols = list(set(total_ctp_cols))
pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"],
npartitions=mp.cpu_count(), columns=total_pivot_cols)
npartitions=mp.cpu_count()*2, columns=total_pivot_cols)
pivot_df = pivot_df.reset_index()
ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"],
npartitions=mp.cpu_count(), columns=total_ctp_cols)
npartitions=mp.cpu_count()*2, columns=total_ctp_cols)
ctp_df = ctp_df.reset_index()
self.app.logger.debug(f"Insumos cargados - {self.identifier}")
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
......@@ -119,7 +121,6 @@ class MatchAndExcludeRecordsAction(ActionInterface):
ctp_cols.remove(self.ctp_params["amount-column"])
max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
......@@ -186,15 +187,11 @@ class MatchAndExcludeRecordsAction(ActionInterface):
if merged.known_divisions:
return pd.DataFrame([])
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
merged = merged.set_index(self.pivot_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.pivot_params["id-column"]]))
merged = merged.sort_values(self.pivot_params["id-column"])
merged = merged.drop_duplicates(subset=pivot_cols)
merged = merged.reset_index()
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
merged = merged.set_index(self.ctp_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.ctp_params["id-column"]]))
merged = merged.sort_values(self.ctp_params["id-column"])
merged = merged.drop_duplicates(subset=ctp_cols)
merged = merged.reset_index()
merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL))
if self.exclude_pivot:
......@@ -211,13 +208,12 @@ class MatchAndExcludeRecordsAction(ActionInterface):
id_col = self.ctp_params["id-column"]
total_tmp_cols = group_cols + ["DIFF"]
df3 = df.merge(merged_df[total_tmp_cols], 'inner', on=group_cols)
# Filtro de exclusión de registros con validación
df3 = df3[df3[EXCLUDE_ROWS_FIELD] == 'S']
df3 = df3[df3[EXCLUDE_ROWS_FIELD] == TAKE_ROWS_WITH]
df3 = df3.drop(EXCLUDE_ROWS_FIELD, axis=1)
df3 = df3.compute()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
total_cols = group_cols + [amount_col, id_col, "DIFF"]
self.app.logger.info(f"Aplicando algoritmo de exclusión. Máximo de {str(max_combinations)} combinaciones - {self.identifier}")
resultado = df3.groupby(group_cols)[total_cols].apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment