Commit 0be01079 authored by Cristian Aguirre's avatar Cristian Aguirre

Update matches "muchos a muchos"

parent 6af4da44
from typing import Any, Dict from typing import Any, Dict
import importlib.util import importlib.util
from itertools import combinations
import time
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from app.main.engine.action.ActionInterface import ActionInterface from app.main.engine.action.ActionInterface import ActionInterface
...@@ -27,6 +31,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -27,6 +31,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.exclude_pivot = None self.exclude_pivot = None
self.pivot_params = None self.pivot_params = None
self.ctp_params = None self.ctp_params = None
self.tmp_df = None
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"] self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]): def parser(self, descriptor: Dict[str, Any]):
...@@ -81,7 +86,24 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -81,7 +86,24 @@ class MatchAndExcludeRecordsAction(ActionInterface):
pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties) pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties)
ctp_df = session.read.jdbc(url=jdbc_url, table=ctp_table, properties=jdbc_properties) ctp_df = session.read.jdbc(url=jdbc_url, table=ctp_table, properties=jdbc_properties)
from pyspark.sql.functions import col, sum, collect_list, round # Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
for column in pivot_df.columns:
pivot_df = pivot_df.withColumnRenamed(column, "PIVOT_"+column)
for column in ctp_df.columns:
ctp_df = ctp_df.withColumnRenamed(column, "COUNTERPART_"+column)
for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()):
if isinstance(self.pivot_params[key_p], str):
self.pivot_params[key_p] = "PIVOT_"+self.pivot_params[key_p]
self.ctp_params[key_c] = "COUNTERPART_"+self.ctp_params[key_c]
else:
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round, udf, array_except
from pyspark.sql.types import StringType, ArrayType, IntegerType
# Ejecutamos lógica de excluir registros # Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0: if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
...@@ -89,23 +111,23 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -89,23 +111,23 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Caso: 1 - Muchos # Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0: elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
ctp_df = ctp_df.groupby(self.ctp_params["columns-group"]). \ ctp_df_2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), 2).alias(self.ctp_params["amount-column"]), agg(round(sum(self.ctp_params["amount-column"]), 2).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"])) collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
ctp_df.show() ctp_df_2.show()
# Caso: Muchos - 1 # Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0: elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
pivot_df = pivot_df.groupby(self.pivot_params["columns-group"]).\ pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).\
agg(round(sum(self.pivot_params["amount-column"]), 2).alias(self.pivot_params["amount-column"]), agg(round(sum(self.pivot_params["amount-column"]), 2).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"])) collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
pivot_df.show() pivot_df2.show()
# Caso: Muchos - Muchos # Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0: elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
pivot_df = pivot_df.groupby(self.pivot_params["columns-group"]). \ pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]). \
agg(round(sum(self.pivot_params["amount-column"]), 2).alias(self.pivot_params["amount-column"]), agg(round(sum(self.pivot_params["amount-column"]), 2).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"])) collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df = ctp_df.groupby(self.ctp_params["columns-group"]). \ ctp_df_2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), 2).alias(self.ctp_params["amount-column"]), agg(round(sum(self.ctp_params["amount-column"]), 2).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"])) collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
...@@ -116,14 +138,33 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -116,14 +138,33 @@ class MatchAndExcludeRecordsAction(ActionInterface):
ctp_cols = self.ctp_params["columns-transaction"].copy() ctp_cols = self.ctp_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols: if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"]) ctp_cols.remove(self.ctp_params["amount-column"])
pivot_df.show()
ctp_df.show()
condition = [pivot_df[col1] == ctp_df[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = pivot_df.join(ctp_df, condition)
merged = merged.withColumn("DIFF", pivot_df[self.pivot_params["amount-column"]] - ctp_df[self.ctp_params["amount-column"]])
merged.show() condition = [pivot_df2[col1] == ctp_df_2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = pivot_df2.join(ctp_df_2, condition)
merged = merged.withColumn("DIFF", pivot_df2[self.pivot_params["amount-column"]] - ctp_df_2[self.ctp_params["amount-column"]])
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], 2))
if self.exclude_pivot:
df_pandas = pivot_df.toPandas()
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
df_pandas = ctp_df.toPandas()
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
timeout = self.timeout
max_combinations = self.max_combinations
custom_udf = udf(lambda diff: custom_apply(diff, df_pandas, group_cols, amount_col, id_col, timeout,
max_combinations), ArrayType(IntegerType()))
merged_df = merged_df.withColumn("LISTA_DIFF", custom_udf(merged_df["DIFF"]))
merged_df = merged_df.withColumn("LIST_PIVOT_ID", array_except(merged_df[self.pivot_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df = merged_df.withColumn("LIST_COUNTERPART_ID", array_except(merged_df[self.ctp_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df.show()
def response(self): def response(self):
print("Llegue al response3") print("Llegue al response3")
...@@ -145,10 +186,61 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -145,10 +186,61 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise Exception(f"Error creando sesion Spark. {e}") raise Exception(f"Error creando sesion Spark. {e}")
def custom_apply(raw, df_pandas, group_cols, amount_col, id_col, timeout, max_combinations):
total_cols = group_cols.copy()
total_cols.append(amount_col)
total_cols.append(id_col)
result = df_pandas[total_cols].groupby(group_cols).apply(custom_func, diff=raw, amount_field=amount_col, id_field=id_col,
timeout=timeout, max_combinations=max_combinations).values
resp = ",".join(map(str, result)) if len(result) > 0 else ""
print(resp)
return [int(elem) for elem in resp[1:-1].split(",")]
def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
group = group.reset_index(drop=True)
values = group[amount_field].values
indexs = group.index.values
tam = len(values)
rang = range(1, tam + 1) if tam <= max_combinations else range(1, max_combinations + 1)
final = None
stop_event = False
def buscar_combinacion(i):
nonlocal final, stop_event
if not stop_event: # Solo continuar si no se ha encontrado una combinación aún
for comb in combinations(indexs, i):
if stop_event: # Verificar si se ha establecido la señal de detención
break
if np.sum(values[list(comb)]).round(2) == diff:
final = group[group.index.isin(comb)][id_field].tolist()
stop_event = True # Establecer la señal de detención
break
return None
start_time = time.time()
with ThreadPoolExecutor() as executor:
futures = [executor.submit(buscar_combinacion, i) for i in rang]
for future in futures:
try:
future.result(timeout=timeout) # Esperar hasta 60 segundos por cada tarea
except TimeoutError:
stop_event = True # Establecer la señal de detención si se alcanza el tiempo límite
break
if stop_event or final is not None:
break
if time.time() - start_time >= timeout:
return None # Devolver None si se alcanzó el tiempo límite
return final
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment