Commit fe0e2748 authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1

parent dd21c192
from typing import Any, Dict, List from typing import Any, Dict, List
import importlib.util import importlib.util
from itertools import combinations from itertools import combinations
import time import multiprocessing as mp
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import dask.dataframe as dd from parallel_pandas import ParallelPandas
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
# import multiprocessing as mp
from functools import partial
# import asyncio
from app.main.engine.action.ActionInterface import ActionInterface from app.main.engine.action.ActionInterface import ActionInterface
...@@ -38,12 +34,12 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -38,12 +34,12 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def __init__(self, app) -> None: def __init__(self, app) -> None:
super().__init__(app) super().__init__(app)
self.max_combinations = None self.max_combinations = None
self.timeout = None self.comb_per_group = None
self.exclude_pivot = None self.exclude_pivot = None
self.pivot_params = None self.pivot_params = None
self.ctp_params = None self.ctp_params = None
self.output = None self.output = None
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"] self.config_params = ["max-records-per-combinations", "max-combinations-per-group", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]): def parser(self, descriptor: Dict[str, Any]):
# Validar si pyspark y su versión está instalada # Validar si pyspark y su versión está instalada
...@@ -79,7 +75,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -79,7 +75,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte") raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.max_combinations = configs["max-records-per-combinations"] self.max_combinations = configs["max-records-per-combinations"]
self.timeout = configs["max-timeout-per-combinations"] self.comb_per_group = configs["max-combinations-per-group"]
self.exclude_pivot = configs["exclude-entity-pivot"] self.exclude_pivot = configs["exclude-entity-pivot"]
self.pivot_params = pivot_params self.pivot_params = pivot_params
self.ctp_params = ctp_params self.ctp_params = ctp_params
...@@ -117,7 +113,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -117,7 +113,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]] self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]] self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round from pyspark.sql.functions import sum, collect_list, round, when, col, lit
pivot_cols = self.pivot_params["columns-transaction"].copy() pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols: if self.pivot_params["amount-column"] in pivot_cols:
...@@ -127,7 +123,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -127,7 +123,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
if self.ctp_params["amount-column"] in ctp_cols: if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"]) ctp_cols.remove(self.ctp_params["amount-column"])
timeout = self.timeout comb_per_group = self.comb_per_group
max_combinations = self.max_combinations max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros # Ejecutamos lógica de excluir registros
...@@ -140,7 +136,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -140,7 +136,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]), agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"])) collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
pivot_df2 = pivot_df.dropDuplicates(subset=pivot_cols) pivot_df2 = pivot_df
# Caso: Muchos - 1 # Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0: elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
...@@ -148,7 +144,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -148,7 +144,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]), agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"])) collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df.dropDuplicates(subset=ctp_cols) ctp_df2 = ctp_df
# Caso: Muchos - Muchos # Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0: elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
...@@ -159,10 +155,19 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -159,10 +155,19 @@ class MatchAndExcludeRecordsAction(ActionInterface):
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]), agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"])) collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)] condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(self.pivot_params["columns-transaction"],
merged = pivot_df2.join(ctp_df2, condition) self.ctp_params["columns-transaction"])]
total_merged = pivot_df2.join(ctp_df2, condition, 'left')
merged = merged.withColumn("DIFF", pivot_df2[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]]) total_merged = total_merged.withColumn("DIFF", when(col(self.ctp_params["columns-transaction"][0]).isNotNull(),
lit(0)).otherwise(lit(None)))
total_merged = total_merged.select(*pivot_df2.columns, "DIFF")
condition = [total_merged[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = total_merged.join(ctp_df2, condition)
merged = merged.withColumn("DIFF", when(col("DIFF").isNull(),
total_merged[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]]).otherwise(col("DIFF")))
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL)) merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot: if self.exclude_pivot:
...@@ -181,9 +186,9 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -181,9 +186,9 @@ class MatchAndExcludeRecordsAction(ActionInterface):
df3 = df3.toPandas() df3 = df3.toPandas()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"] total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
num_chunks = 4
resultado = df3[total_cols].groupby(group_cols).apply(lambda x: custom_func(x, amount_col, id_col, timeout, max_combinations)) ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
resultado = df3[total_cols].groupby(group_cols).p_apply(lambda x: custom_func(x, amount_col, id_col, comb_per_group, max_combinations))
resultado = resultado.reset_index() resultado = resultado.reset_index()
if len(resultado.columns) == 1: if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"]) resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
...@@ -240,14 +245,16 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -240,14 +245,16 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise Exception(f"Error creando sesion Spark. {e}") raise Exception(f"Error creando sesion Spark. {e}")
def custom_func(group, amount_field, id_field, timeout, max_combinations): def custom_func(group, amount_field, id_field, max_comb_per_group, max_combinations):
diff = group["DIFF"].values[0] diff = group["DIFF"].values[0]
if diff == 0: if pd.isna(diff) or diff == 0:
return None return None
group = group[group[EXCLUDE_ROWS_FIELD] == 'S']
group[amount_field] = group[amount_field].astype(float) group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True) group = group.reset_index(drop=True)
values = group[amount_field].values values = group[amount_field].values
indexs = group.index.values indexs = group.index.values
np.random.shuffle(indexs)
tam = len(values) tam = len(values)
rang = range(1, tam + 1) if tam <= max_combinations else range(1, max_combinations + 1) rang = range(1, tam + 1) if tam <= max_combinations else range(1, max_combinations + 1)
...@@ -256,23 +263,17 @@ def custom_func(group, amount_field, id_field, timeout, max_combinations): ...@@ -256,23 +263,17 @@ def custom_func(group, amount_field, id_field, timeout, max_combinations):
def buscar_combinacion(i): def buscar_combinacion(i):
nonlocal final, stop_event nonlocal final, stop_event
if not stop_event:
if not stop_event: # Solo continuar si no se ha encontrado una combinación aún for index, comb in enumerate(combinations(indexs, i)):
for comb in combinations(indexs, i): if stop_event or index > max_comb_per_group:
if stop_event: # Verificar si se ha establecido la señal de detención break
elif np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff:
final = group.loc[list(comb), id_field].tolist()
stop_event = True
break break
if np.isclose(np.sum(values[list(comb)]), diff):
# Validamos si la combinación son de registros validados 'EXCLUDE_VALID'
result = group[group.index.isin(comb)]
if (result[EXCLUDE_ROWS_FIELD] == 'S').all():
final = result[id_field].tolist()
stop_event = True # Establecer la señal de detención
break
return None return None
start_time = time.time()
with ThreadPoolExecutor() as executor: with ThreadPoolExecutor() as executor:
futures = [executor.submit(buscar_combinacion, i) for i in rang] futures = [executor.submit(buscar_combinacion, i) for i in rang]
...@@ -280,14 +281,12 @@ def custom_func(group, amount_field, id_field, timeout, max_combinations): ...@@ -280,14 +281,12 @@ def custom_func(group, amount_field, id_field, timeout, max_combinations):
try: try:
future.result() future.result()
except TimeoutError: except TimeoutError:
stop_event = True # Establecer la señal de detención si se alcanza el tiempo límite stop_event = True
break break
if stop_event or final is not None: if stop_event or final is not None:
break break
if time.time() - start_time >= timeout:
return None # Devolver None si se alcanzó el tiempo límite
return final return final
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment