Commit b0fd6670 authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1

parent 06da121b
......@@ -11,3 +11,4 @@ class CodeResponseEnum(Enum):
OUTPUT_ERROR = 606
EMPTY_DATASET = 607
ERROR = 609
TIMEOUT = 610
......@@ -4,3 +4,4 @@ from enum import Enum
class StatusEnum(Enum):
OK = 200
ERROR = 609
TIMEOUT = 610
......@@ -57,6 +57,9 @@ class Process:
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"])
except TimeoutError as e:
self.app.logger.error(f"Error de Timeout. Error: {e}")
status, status_description = CodeResponseEnum.TIMEOUT, str(e)
except IndexError as e:
self.app.logger.error(f"Error extrayendo insumos. Vacío. Error: {e}")
status, status_description = CodeResponseEnum.EMPTY_DATASET, str(e)
......
......@@ -5,6 +5,8 @@ import shutil
from enum import Enum
# from pyspark.sql import SparkSession
import json
from app.main.engine.enum.CodeResponseEnum import CodeResponseEnum
from app.main.engine.util.Timezone import Timezone
# from config import Config as cfg
......@@ -52,8 +54,11 @@ class Utils:
if codeEnum.value == StatusEnum.OK.value:
response.update({'status': StatusEnum.OK.name, 'detail': detail})
else:
error = StatusEnum.ERROR.name
if codeEnum.value == CodeResponseEnum.TIMEOUT.value:
error = StatusEnum.TIMEOUT.name
description = DescResponseEnum[codeEnum.name].value
response.update({'status': StatusEnum.ERROR.name, 'message': description,
response.update({'status': error, 'message': description,
'detail': detail})
return response
......@@ -65,6 +70,14 @@ class Utils:
pivot_params = descriptor["params-input"]["pivot-config"]
ctp_params = descriptor["params-input"]["counterpart-config"]
for key_p, key_c in zip(pivot_params.keys(), ctp_params.keys()):
if isinstance(pivot_params[key_p], str):
pivot_params[key_p] = "PIVOT_" + pivot_params[key_p]
ctp_params[key_c] = "COUNTERPART_" + ctp_params[key_c]
else:
pivot_params[key_p] = ["PIVOT_" + column for column in pivot_params[key_p]]
ctp_params[key_c] = ["COUNTERPART_" + column for column in ctp_params[key_c]]
group_pivot_match = pivot_params["columns-group"]
transaction_pivot_match = pivot_params["columns-transaction"]
......@@ -73,7 +86,7 @@ class Utils:
used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match
if data.empty:
if data is None or data.empty:
self.app.logger.info(f"El dataframe resultado esta vacio")
else:
for idx, i in data.iterrows():
......
from typing import Any, Dict, List
from typing import Any, Dict
import importlib.util
from itertools import combinations
import multiprocessing as mp
import numpy as np
import pandas as pd
from numba import njit
import multiprocessing as mp
from parallel_pandas import ParallelPandas
from concurrent.futures import ThreadPoolExecutor
from wrapt_timeout_decorator import timeout
from app.main.engine.action.ActionInterface import ActionInterface
......@@ -35,12 +33,12 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def __init__(self, app) -> None:
super().__init__(app)
self.max_combinations = None
self.comb_per_group = None
self.timeout = None
self.exclude_pivot = None
self.pivot_params = None
self.ctp_params = None
self.output = None
self.config_params = ["max-records-per-combinations", "max-combinations-per-group", "exclude-entity-pivot"]
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]):
# Validar si pyspark y su versión está instalada
......@@ -76,146 +74,152 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.max_combinations = configs["max-records-per-combinations"]
self.comb_per_group = configs["max-combinations-per-group"]
self.timeout = configs["max-timeout-per-combinations"]
self.exclude_pivot = configs["exclude-entity-pivot"]
self.pivot_params = pivot_params
self.ctp_params = ctp_params
def process(self, source_obj):
# Inicializar la sesion de Spark
session = self.createSession()
# Traer la data desde BD tanto pivot como contraparte
pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"]
jdbc_conn = source_obj.create_spark_connection()
jdbc_url = jdbc_conn["url"]
jdbc_properties = jdbc_conn["properties"]
pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties)
ctp_df = session.read.jdbc(url=jdbc_url, table=ctp_table, properties=jdbc_properties)
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
for column in pivot_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
pivot_df = pivot_df.withColumnRenamed(column, "PIVOT_"+column)
for column in ctp_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
ctp_df = ctp_df.withColumnRenamed(column, "COUNTERPART_"+column)
for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()):
if isinstance(self.pivot_params[key_p], str):
self.pivot_params[key_p] = "PIVOT_"+self.pivot_params[key_p]
self.ctp_params[key_c] = "COUNTERPART_"+self.ctp_params[key_c]
else:
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round, when, col, lit
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"])
ctp_cols = self.ctp_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
comb_per_group = self.comb_per_group
max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
# Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
pivot_df2 = pivot_df
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).\
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]). \
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(self.pivot_params["columns-transaction"],
self.ctp_params["columns-transaction"])]
total_merged = pivot_df2.join(ctp_df2, condition, 'left')
total_merged = total_merged.withColumn("DIFF", when(col(self.ctp_params["columns-transaction"][0]).isNotNull(),
lit(0)).otherwise(lit(None)))
total_merged = total_merged.select(*pivot_df2.columns, "DIFF")
condition = [total_merged[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = total_merged.join(ctp_df2, condition)
merged = merged.withColumn("DIFF", when(col("DIFF").isNull(),
total_merged[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]]).otherwise(col("DIFF")))
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot:
df = pivot_df
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
df = ctp_df
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
total_tmp_cols = group_cols + ["DIFF"]
df3 = df.join(merged_df.select(*total_tmp_cols), group_cols)
df3 = df3.toPandas()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
# ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
df3 = df3.sort_values(group_cols + [amount_col])
resultado = df3[total_cols].groupby(group_cols).apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
resultado = resultado.reset_index()
if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else:
resultado.columns = group_cols + ["LISTA_DIFF"]
# print(resultado["LISTA_DIFF"].apply(lambda x: x if pd.notna(x) and x[0]!=-1 else x))
meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols)
print(meged2)
meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array)
meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))]
if meged2.empty:
pass
elif self.exclude_pivot:
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"})
if meged2['INTER_CTP_ID'].dtype == 'int64':
merged_df['INTER_CTP_ID'] = merged_df['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else:
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"})
if meged2['INTER_PIVOT_ID'].dtype == 'int64':
merged_df['INTER_PIVOT_ID'] = merged_df['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object')
self.output = meged2
def process(self, source_obs):
try:
@timeout(self.timeout)
def __process(source_obj):
# Inicializar la sesion de Spark
session = self.createSession()
# Traer la data desde BD tanto pivot como contraparte
pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"]
jdbc_conn = source_obj.create_spark_connection()
jdbc_url = jdbc_conn["url"]
jdbc_properties = jdbc_conn["properties"]
pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties)
ctp_df = session.read.jdbc(url=jdbc_url, table=ctp_table, properties=jdbc_properties)
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
for column in pivot_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
pivot_df = pivot_df.withColumnRenamed(column, "PIVOT_"+column)
for column in ctp_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
ctp_df = ctp_df.withColumnRenamed(column, "COUNTERPART_"+column)
for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()):
if isinstance(self.pivot_params[key_p], str):
self.pivot_params[key_p] = "PIVOT_"+self.pivot_params[key_p]
self.ctp_params[key_c] = "COUNTERPART_"+self.ctp_params[key_c]
else:
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round, when, col, lit
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"])
ctp_cols = self.ctp_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
# Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
pivot_df2 = pivot_df
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).\
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]). \
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(self.pivot_params["columns-transaction"],
self.ctp_params["columns-transaction"])]
total_merged = pivot_df2.join(ctp_df2, condition, 'left')
total_merged = total_merged.withColumn("DIFF", when(col(self.ctp_params["columns-transaction"][0]).isNotNull(),
lit(0)).otherwise(lit(None)))
total_merged = total_merged.select(*pivot_df2.columns, "DIFF")
condition = [total_merged[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = total_merged.join(ctp_df2, condition)
merged = merged.withColumn("DIFF", when(col("DIFF").isNull(),
total_merged[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]]).otherwise(col("DIFF")))
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot:
df = pivot_df
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
df = ctp_df
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
total_tmp_cols = group_cols + ["DIFF"]
df3 = df.join(merged_df.select(*total_tmp_cols), group_cols)
df3 = df3.toPandas()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
df3 = df3.sort_values(group_cols + [amount_col])
resultado = df3[total_cols].groupby(group_cols).p_apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
resultado = resultado.reset_index()
if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else:
resultado.columns = group_cols + ["LISTA_DIFF"]
meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols)
meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array)
meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))]
if meged2.empty:
pass
elif self.exclude_pivot:
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"})
if meged2['INTER_CTP_ID'].dtype == 'int64':
merged_df['INTER_CTP_ID'] = merged_df['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else:
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"})
if meged2['INTER_PIVOT_ID'].dtype == 'int64':
merged_df['INTER_PIVOT_ID'] = merged_df['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object')
return meged2
except TimeoutError as e:
raise TimeoutError(f"Tiempo límite superado. {e}")
self.output = __process(source_obs)
def response(self):
return self.output
......@@ -251,91 +255,57 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def custom_func(group, amount_field, id_field, max_combinations):
diff = group["DIFF"].values[0]
diff = int(group["DIFF"].values[0]*(10**ROUND_DECIMAL))
if pd.isna(diff) or diff == 0:
return None
group = group[group[EXCLUDE_ROWS_FIELD] == 'S']
group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True)
values = group[amount_field].values
values *= (10**ROUND_DECIMAL)
values = values.astype(np.int64)
ids = group[id_field].values
n = len(values)
valores1 = encontrar_comb_1(values, diff)
if valores1[0] != -1:
indices = ids[valores1]
return indices
valores2 = encontrar_comb_2(values, diff, n)
if valores2[0] != -1:
indices = ids[valores2]
return indices
# Iterar sobre todos los índices posibles
# valores4 = encontrar_comb_4(values, diff, n)
# if valores4[0] != -1:
# indices = ids[valores4]
# return indices
valores5 = encontrar_comb_5(values, diff, n)
if valores5[0] != -1:
indices = ids[valores5]
return indices
@njit
def encontrar_comb_1(valores, target):
indice = [-1]
for idx, value in enumerate(valores):
suma = value
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value]]
return indice
return indice
@njit
def encontrar_comb_2(valores, target, n):
indice = [-1]
for i in range(n):
array_except = np.delete(valores, i)
for idx, value in enumerate(array_except):
suma = value + valores[i]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i]]]
return indice
return indice
@njit
def encontrar_comb_4(valores, target, n):
indice = [-1]
for i in range(n):
a1 = np.delete(valores, i)
for j in range(len(a1)):
a2 = np.delete(a1, j)
for k in range(len(a2)):
array_except = np.delete(a2, k)
for idx, value in enumerate(array_except):
suma = value + valores[i] + a1[j] + a2[k]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i], a1[j], a2[k]]]
return indice
return indice
@njit
def encontrar_comb_5(valores, target, n):
indice = [-1]
for i in range(n):
a1 = np.delete(valores, i)
for j in range(len(a1)):
a2 = np.delete(a1, j)
for k in range(len(a2)):
a3 = np.delete(a2, k)
for l in range(len(a3)):
array_except = np.delete(a2, l)
for idx, value in enumerate(array_except):
suma = value + valores[i] + a1[j] + a2[k] + a3[l]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i], a1[j], a2[k], a3[l]]]
return indice
return indice
tam = len(values)
tam = tam if tam <= max_combinations else max_combinations
result = subset_sum_iter(values, diff, tam)
indices = ids[np.isin(values, result)]
return indices
def subset_sum_iter(numbers, target, num_elements):
# Initialize solutions list
solutions = []
for step in range(1, num_elements+1):
# Build first index by taking the first num_elements from the numbers
indices = list(range(step))
solution = [numbers[i] for i in indices]
if sum(solution) == target:
solutions.append(solution)
# We iterate over the rest of the indices until we have tried all combinations
while True:
for i in range(step):
if indices[i] != i + len(numbers) - step:
break
else:
# No combinations left
break
# Increase current index and all its following ones
indices[i] += 1
for j in range(i + 1, step):
indices[j] = indices[j - 1] + 1
# Check current solution
solution = [numbers[i] for i in indices]
if round(sum(solution), ROUND_DECIMAL) == target:
solutions.append(solution)
break
if len(solutions) > 0:
solutions = solutions[0]
break
return solutions
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment