Commit 8ac5263c authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1-dask

parent 03328797
...@@ -45,17 +45,20 @@ class Process: ...@@ -45,17 +45,20 @@ class Process:
obj_script.parser(self.descriptor) obj_script.parser(self.descriptor)
# Iniciando process # Iniciando process
self.app.logger.info(f"Iniciando procesamiento de script") self.app.logger.info(f"Iniciando procesamiento de script - {self.descriptor['idProcess']}")
obj_script.process(source) obj_script.process(source)
# Guardando resultado # Guardando resultado
self.app.logger.info(f"Generado y guardando resultado") self.app.logger.info(f"Generado y guardando resultado - {self.descriptor['idProcess']}")
response = obj_script.response() response = obj_script.response()
result = self.utils.create_result(response, self.descriptor) result = self.utils.create_result(response, self.descriptor)
del response
save = self.utils.save_result(result, self.descriptor, db_session) save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name: if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"]) raise InterruptedError(save["message"])
del result
except TimeoutError as e: except TimeoutError as e:
self.app.logger.error(f"Error de Timeout. Error: {e}") self.app.logger.error(f"Error de Timeout. Error: {e}")
status, status_description = CodeResponseEnum.TIMEOUT, str(e) status, status_description = CodeResponseEnum.TIMEOUT, str(e)
...@@ -79,7 +82,7 @@ class Process: ...@@ -79,7 +82,7 @@ class Process:
status, status_description = CodeResponseEnum.PARAMETERS_ERROR, str(e) status, status_description = CodeResponseEnum.PARAMETERS_ERROR, str(e)
except Exception as e: except Exception as e:
traceback_lib.print_exc() traceback_lib.print_exc()
self.app.logger.error(f"Error procesando engine. {e}") self.app.logger.error(f"Error procesando engine - {self.descriptor['idProcess']}. {e}")
status, status_description = StatusEnum.ERROR, str(e) status, status_description = StatusEnum.ERROR, str(e)
finally: finally:
return self.utils.create_response(status, status_description) return self.utils.create_response(status, status_description)
...@@ -7,7 +7,7 @@ services: ...@@ -7,7 +7,7 @@ services:
resources: resources:
limits: limits:
cpus: '8' cpus: '8'
memory: 16G memory: 24G
reservations: reservations:
cpus: '4' cpus: '4'
memory: 8G memory: 8G
......
...@@ -10,17 +10,6 @@ import multiprocessing as mp ...@@ -10,17 +10,6 @@ import multiprocessing as mp
from app.main.engine.action.ActionInterface import ActionInterface from app.main.engine.action.ActionInterface import ActionInterface
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Crea un controlador para imprimir en la consola
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# RELACION DE IDENTIFICADOR DE ACCION Y NOMBRE DE CLASE # RELACION DE IDENTIFICADOR DE ACCION Y NOMBRE DE CLASE
relation_classname_identifier = { relation_classname_identifier = {
"match-and-exclude-records-actions": "MatchAndExcludeRecordsAction" "match-and-exclude-records-actions": "MatchAndExcludeRecordsAction"
...@@ -36,6 +25,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -36,6 +25,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def __init__(self, app) -> None: def __init__(self, app) -> None:
super().__init__(app) super().__init__(app)
self.identifier = None
self.max_combinations = None self.max_combinations = None
self.timeout = None self.timeout = None
self.exclude_pivot = None self.exclude_pivot = None
...@@ -69,6 +59,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -69,6 +59,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
if param not in pivot_params.keys() or param not in ctp_params.keys(): if param not in pivot_params.keys() or param not in ctp_params.keys():
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte") raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.identifier = descriptor["idProcess"]
self.max_combinations = configs["max-records-per-combinations"] self.max_combinations = configs["max-records-per-combinations"]
self.timeout = configs["max-timeout-per-combinations"] self.timeout = configs["max-timeout-per-combinations"]
self.exclude_pivot = configs["exclude-entity-pivot"] self.exclude_pivot = configs["exclude-entity-pivot"]
...@@ -82,14 +73,20 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -82,14 +73,20 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Traer la data desde BD tanto pivot como contraparte # Traer la data desde BD tanto pivot como contraparte
pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"] pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"]
dialect = source_obj.get_dialect() dialect = source_obj.get_dialect()
# Seleccionamos las columnas a traer de la BD
total_pivot_cols = self.pivot_params["columns-transaction"] + self.pivot_params["columns-group"] + \
[self.pivot_params["amount-column"], self.pivot_params["id-column"], EXCLUDE_ROWS_FIELD]
total_pivot_cols = list(set(total_pivot_cols))
total_ctp_cols = self.ctp_params["columns-transaction"] + self.ctp_params["columns-group"] + \
[self.ctp_params["amount-column"], self.ctp_params["id-column"], EXCLUDE_ROWS_FIELD]
total_ctp_cols = list(set(total_ctp_cols))
pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"], pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"],
npartitions=mp.cpu_count()) npartitions=mp.cpu_count(), columns=total_pivot_cols)
pivot_df = pivot_df.reset_index() pivot_df = pivot_df.reset_index()
ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"], ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"],
npartitions=mp.cpu_count()) npartitions=mp.cpu_count(), columns=total_ctp_cols)
ctp_df = ctp_df.reset_index() ctp_df = ctp_df.reset_index()
logger.debug(f"Insumos cargados") self.app.logger.debug(f"Insumos cargados - {self.identifier}")
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input # Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_' # pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
# Iterar sobre las columnas del DataFrame # Iterar sobre las columnas del DataFrame
...@@ -129,7 +126,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -129,7 +126,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Caso: 1 - Muchos # Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0: elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
logger.info("Ejecutando Caso 1 - Muchos") self.app.logger.info(f"Ejecutando Caso 1 - Muchos - {self.identifier}")
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({ ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({
self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades
self.ctp_params["id-column"]: list self.ctp_params["id-column"]: list
...@@ -140,7 +137,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -140,7 +137,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Caso: Muchos - 1 # Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0: elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
logger.info("Ejecutando Caso Muchos - 1") self.app.logger.info(f"Ejecutando Caso Muchos - 1 - {self.identifier}")
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({ pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
self.pivot_params["amount-column"]: 'sum', self.pivot_params["amount-column"]: 'sum',
self.pivot_params["id-column"]: list self.pivot_params["id-column"]: list
...@@ -151,7 +148,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -151,7 +148,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Caso: Muchos - Muchos # Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0: elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
logger.info("Ejecutando Caso Muchos - Muchos") self.app.logger.info(f"Ejecutando Caso Muchos - Muchos - {self.identifier}")
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({ pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
self.pivot_params["amount-column"]: 'sum', self.pivot_params["amount-column"]: 'sum',
self.pivot_params["id-column"]: list self.pivot_params["id-column"]: list
...@@ -196,13 +193,13 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -196,13 +193,13 @@ class MatchAndExcludeRecordsAction(ActionInterface):
merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL)) merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL))
if self.exclude_pivot: if self.exclude_pivot:
logger.info("Se excluiran registros del pivot") self.app.logger.info(f"Se excluiran registros del pivot - {self.identifier}")
df = pivot_df df = pivot_df
group_cols = self.pivot_params["columns-group"] group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"] amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"] id_col = self.pivot_params["id-column"]
else: else:
logger.info("Se excluiran registros de la contraparte") self.app.logger.info(f"Se excluiran registros de la contraparte - {self.identifier}")
df = ctp_df df = ctp_df
group_cols = self.ctp_params["columns-group"] group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"] amount_col = self.ctp_params["amount-column"]
...@@ -211,9 +208,12 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -211,9 +208,12 @@ class MatchAndExcludeRecordsAction(ActionInterface):
total_tmp_cols = group_cols + ["DIFF"] total_tmp_cols = group_cols + ["DIFF"]
df3 = df.merge(merged_df[total_tmp_cols], 'inner', on=group_cols) df3 = df.merge(merged_df[total_tmp_cols], 'inner', on=group_cols)
# Filtro de exclusión de registros con validación
df3 = df3[df3[EXCLUDE_ROWS_FIELD] == 'S']
df3 = df3.compute() df3 = df3.compute()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"] total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
logger.info(f"Aplicando algoritmo de exclusión. Máximo de {str(max_combinations)} combinaciones") self.app.logger.info(f"Aplicando algoritmo de exclusión. Máximo de {str(max_combinations)} combinaciones - {self.identifier}")
resultado = df3.groupby(group_cols)[total_cols].apply(lambda x: custom_func(x, amount_col, id_col, max_combinations)) resultado = df3.groupby(group_cols)[total_cols].apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
resultado = resultado.reset_index() resultado = resultado.reset_index()
...@@ -228,10 +228,8 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -228,10 +228,8 @@ class MatchAndExcludeRecordsAction(ActionInterface):
meged2 = meged2.map_partitions(lambda partition: partition.assign( meged2 = meged2.map_partitions(lambda partition: partition.assign(
LISTA_DIFF=partition['LISTA_DIFF'].apply(lambda x: [] if pd.isna(x) else x)), meta=meged2.dtypes.to_dict()) LISTA_DIFF=partition['LISTA_DIFF'].apply(lambda x: [] if pd.isna(x) else x)), meta=meged2.dtypes.to_dict())
meged2 = meged2[ meged2 = meged2[(meged2['DIFF'] != 0) & meged2['LISTA_DIFF'].apply(
(meged2['DIFF'] == 0) | lambda x: True if not pd.isna(x) and ((isinstance(x, List) and len(x) > 0) or (isinstance(x, str) and len(x) > 2)) else False)
((meged2['DIFF'] != 0) & meged2['LISTA_DIFF'].apply(
lambda x: True if not pd.isna(x) and ((isinstance(x, List) and len(x) > 0) or (isinstance(x, str) and len(x) > 2)) else False))
] ]
meged2 = meged2.compute() meged2 = meged2.compute()
...@@ -254,7 +252,8 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -254,7 +252,8 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise TimeoutError(f"Tiempo límite superado. {e}") raise TimeoutError(f"Tiempo límite superado. {e}")
start_time = time.time() start_time = time.time()
self.output = __process(source_obs) self.output = __process(source_obs)
logger.info(f"Ejecución de script terminado en {time.time() - start_time} segundos") self.app.logger.debug(f"{self.output} - {self.identifier}")
self.app.logger.info(f"Ejecución de script terminado en {time.time() - start_time} segundos - {self.identifier}")
def response(self): def response(self):
return self.output return self.output
...@@ -287,7 +286,6 @@ def custom_func(group, amount_field, id_field, max_combinations): ...@@ -287,7 +286,6 @@ def custom_func(group, amount_field, id_field, max_combinations):
diff = int(diff_value*(10**ROUND_DECIMAL)) diff = int(diff_value*(10**ROUND_DECIMAL))
if pd.isna(diff) or diff == 0: if pd.isna(diff) or diff == 0:
return None return None
group = group[group[EXCLUDE_ROWS_FIELD] == 'S']
group[amount_field] = group[amount_field].astype(float) group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True) group = group.reset_index(drop=True)
values = group[amount_field].values values = group[amount_field].values
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment