Commit acead9c2 authored by Erly Villaroel's avatar Erly Villaroel

Merge remote-tracking branch 'origin/developer_ca' into developer_ev

parents 8917d564 fe0e2748
from typing import Any, Dict, List
import importlib.util
from itertools import combinations
import time
import multiprocessing as mp
import numpy as np
import pandas as pd
import dask.dataframe as dd
from parallel_pandas import ParallelPandas
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
# import multiprocessing as mp
from functools import partial
# import asyncio
from app.main.engine.action.ActionInterface import ActionInterface
......@@ -38,12 +34,12 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def __init__(self, app) -> None:
super().__init__(app)
self.max_combinations = None
self.timeout = None
self.comb_per_group = None
self.exclude_pivot = None
self.pivot_params = None
self.ctp_params = None
self.output = None
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
self.config_params = ["max-records-per-combinations", "max-combinations-per-group", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]):
# Validar si pyspark y su versión está instalada
......@@ -79,7 +75,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.max_combinations = configs["max-records-per-combinations"]
self.timeout = configs["max-timeout-per-combinations"]
self.comb_per_group = configs["max-combinations-per-group"]
self.exclude_pivot = configs["exclude-entity-pivot"]
self.pivot_params = pivot_params
self.ctp_params = ctp_params
......@@ -117,7 +113,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round
from pyspark.sql.functions import sum, collect_list, round, when, col, lit
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
......@@ -127,7 +123,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
timeout = self.timeout
comb_per_group = self.comb_per_group
max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros
......@@ -140,7 +136,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
pivot_df2 = pivot_df.dropDuplicates(subset=pivot_cols)
pivot_df2 = pivot_df
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
......@@ -148,7 +144,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df.dropDuplicates(subset=ctp_cols)
ctp_df2 = ctp_df
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
......@@ -159,10 +155,19 @@ class MatchAndExcludeRecordsAction(ActionInterface):
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = pivot_df2.join(ctp_df2, condition)
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(self.pivot_params["columns-transaction"],
self.ctp_params["columns-transaction"])]
total_merged = pivot_df2.join(ctp_df2, condition, 'left')
merged = merged.withColumn("DIFF", pivot_df2[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]])
total_merged = total_merged.withColumn("DIFF", when(col(self.ctp_params["columns-transaction"][0]).isNotNull(),
lit(0)).otherwise(lit(None)))
total_merged = total_merged.select(*pivot_df2.columns, "DIFF")
condition = [total_merged[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = total_merged.join(ctp_df2, condition)
merged = merged.withColumn("DIFF", when(col("DIFF").isNull(),
total_merged[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]]).otherwise(col("DIFF")))
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot:
......@@ -181,9 +186,9 @@ class MatchAndExcludeRecordsAction(ActionInterface):
df3 = df3.toPandas()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
num_chunks = 4
resultado = df3[total_cols].groupby(group_cols).apply(lambda x: custom_func(x, amount_col, id_col, timeout, max_combinations))
ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
resultado = df3[total_cols].groupby(group_cols).p_apply(lambda x: custom_func(x, amount_col, id_col, comb_per_group, max_combinations))
resultado = resultado.reset_index()
if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
......@@ -240,14 +245,16 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise Exception(f"Error creando sesion Spark. {e}")
def custom_func(group, amount_field, id_field, timeout, max_combinations):
def custom_func(group, amount_field, id_field, max_comb_per_group, max_combinations):
diff = group["DIFF"].values[0]
if diff == 0:
if pd.isna(diff) or diff == 0:
return None
group = group[group[EXCLUDE_ROWS_FIELD] == 'S']
group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True)
values = group[amount_field].values
indexs = group.index.values
np.random.shuffle(indexs)
tam = len(values)
rang = range(1, tam + 1) if tam <= max_combinations else range(1, max_combinations + 1)
......@@ -256,23 +263,17 @@ def custom_func(group, amount_field, id_field, timeout, max_combinations):
def buscar_combinacion(i):
nonlocal final, stop_event
if not stop_event: # Solo continuar si no se ha encontrado una combinación aún
for comb in combinations(indexs, i):
if stop_event: # Verificar si se ha establecido la señal de detención
if not stop_event:
for index, comb in enumerate(combinations(indexs, i)):
if stop_event or index > max_comb_per_group:
break
elif np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff:
final = group.loc[list(comb), id_field].tolist()
stop_event = True
break
if np.isclose(np.sum(values[list(comb)]), diff):
# Validamos si la combinación son de registros validados 'EXCLUDE_VALID'
result = group[group.index.isin(comb)]
if (result[EXCLUDE_ROWS_FIELD] == 'S').all():
final = result[id_field].tolist()
stop_event = True # Establecer la señal de detención
break
return None
start_time = time.time()
with ThreadPoolExecutor() as executor:
futures = [executor.submit(buscar_combinacion, i) for i in rang]
......@@ -280,14 +281,12 @@ def custom_func(group, amount_field, id_field, timeout, max_combinations):
try:
future.result()
except TimeoutError:
stop_event = True # Establecer la señal de detención si se alcanza el tiempo límite
stop_event = True
break
if stop_event or final is not None:
break
if time.time() - start_time >= timeout:
return None # Devolver None si se alcanzó el tiempo límite
return final
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment