Commit 287ace11 authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer_ca' into 'developer'

Update action-exclude-records-v1

See merge request !4
parents 6e759b58 fe0e2748
......@@ -2,7 +2,7 @@ from enum import Enum
class CodeResponseEnum(Enum):
SUCCESS = 200
OK = 200
MAX_EXECUTION_ERROR = 601
BD_INPUT_ERROR = 602
SCRIPT_ERROR = 603
......
......@@ -2,5 +2,5 @@ from enum import Enum
class StatusEnum(Enum):
SUCCESS = 200
OK = 200
ERROR = 609
......@@ -20,7 +20,7 @@ class EngineService:
self.app.logger.info(f"Ejecuciones simultáneas actualmente: {self.executions}")
if self.executions > self.max_executions:
self.app.logger.info(f"Máxima de ejecuciones en paralelo alcanzado. Máximo: {self.max_executions}")
response = {'status': StatusEnum.ERROR.name.lower(),
response = {'status': StatusEnum.ERROR.name,
'message': DescResponseEnum.MAX_EXECUTION_ERROR.value}
else:
process = Process(self.app, self.descriptor)
......
......@@ -20,7 +20,7 @@ class Process:
self.utils = Utils(app)
def run(self) -> Dict[str, Any]:
status, status_description = StatusEnum.SUCCESS, ""
status, status_description = StatusEnum.OK, ""
try:
# Obteniendo la conexión a la BD
db_params = cfg.db_params
......@@ -45,10 +45,14 @@ class Process:
obj_script.parser(self.descriptor)
# Iniciando process
self.app.logger.info(f"Iniciando procesamiento de script")
obj_script.process(source)
# Guardando resultado
self.app.logger.info(f"Generado y guardando resultado")
response = obj_script.response()
# response.show()
result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
......
......@@ -49,11 +49,11 @@ class Utils:
def create_response(self, codeEnum: Enum, detail: str = "") -> Dict[str, Any]:
response = {"statusCode": codeEnum.value}
if codeEnum.value == StatusEnum.SUCCESS.value:
response.update({'status': StatusEnum.SUCCESS.name.lower(), 'path': detail})
if codeEnum.value == StatusEnum.OK.value:
response.update({'status': StatusEnum.OK.name, 'detail': detail})
else:
description = DescResponseEnum[codeEnum.name].value
response.update({'status': StatusEnum.ERROR.name.lower(), 'message': description,
response.update({'status': StatusEnum.ERROR.name, 'message': description,
'detail': detail})
return response
......@@ -73,10 +73,10 @@ class Utils:
used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match
if data.count() == 0:
if data.empty:
self.app.logger.info(f"El dataframe resultado esta vacio")
else:
for i in data.collect():
for idx, i in data.iterrows():
input_data = {}
key_transaction = None
key_group_pivot = None
......@@ -104,7 +104,7 @@ class Utils:
input_data["exclude-ids"] = str(i[FixedFieldsEnum.LISTA_DIFF.value])
input_data["difference-amount"] = str(i[FixedFieldsEnum.DIFF.value])
result.append(input_data)
response['status'] = StatusEnum.SUCCESS.value
response['status'] = StatusEnum.OK.value
response["detail"] = result
except Exception as e:
self.app.logger.error(f"Error al crear el diccionario de resultados. {e}")
......@@ -130,7 +130,7 @@ class Utils:
)
session.add(result_obj)
session.commit()
response['status'] = StatusEnum.SUCCESS.name
response['status'] = StatusEnum.OK.name
except Exception as e:
session.rollback()
response["status"] = StatusEnum.ERROR.name
......
from typing import Any, Dict
from typing import Any, Dict, List
import importlib.util
from itertools import combinations
import time
import multiprocessing as mp
import numpy as np
import pandas as pd
from parallel_pandas import ParallelPandas
from concurrent.futures import ThreadPoolExecutor
from app.main.engine.action.ActionInterface import ActionInterface
......@@ -14,8 +16,8 @@ relation_classname_identifier = {
# CONFIGURACION DE SESION DE SPARK
MASTER = "local[*]"
DRIVER_MEMORY = "4g"
EXECUTOR_MEMORY = "4g"
DRIVER_MEMORY = "8g"
EXECUTOR_MEMORY = "8g"
MYSQL_JAR_PATH = "jars/mysql-connector-java-8.0.30.jar"
# EXCLUDE VALIDATION FIELD
......@@ -32,12 +34,12 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def __init__(self, app) -> None:
super().__init__(app)
self.max_combinations = None
self.timeout = None
self.comb_per_group = None
self.exclude_pivot = None
self.pivot_params = None
self.ctp_params = None
self.output = None
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
self.config_params = ["max-records-per-combinations", "max-combinations-per-group", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]):
# Validar si pyspark y su versión está instalada
......@@ -73,7 +75,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.max_combinations = configs["max-records-per-combinations"]
self.timeout = configs["max-timeout-per-combinations"]
self.comb_per_group = configs["max-combinations-per-group"]
self.exclude_pivot = configs["exclude-entity-pivot"]
self.pivot_params = pivot_params
self.ctp_params = ctp_params
......@@ -111,8 +113,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round, udf, array_except, size, col, array
from pyspark.sql.types import ArrayType, IntegerType, LongType
from pyspark.sql.functions import sum, collect_list, round, when, col, lit
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
......@@ -122,7 +123,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
timeout = self.timeout
comb_per_group = self.comb_per_group
max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros
......@@ -135,7 +136,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
pivot_df2 = pivot_df.dropDuplicates(subset=pivot_cols)
pivot_df2 = pivot_df
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
......@@ -143,7 +144,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df.dropDuplicates(subset=ctp_cols)
ctp_df2 = ctp_df
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
......@@ -154,45 +155,79 @@ class MatchAndExcludeRecordsAction(ActionInterface):
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = pivot_df2.join(ctp_df2, condition)
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(self.pivot_params["columns-transaction"],
self.ctp_params["columns-transaction"])]
total_merged = pivot_df2.join(ctp_df2, condition, 'left')
merged = merged.withColumn("DIFF", pivot_df2[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]])
total_merged = total_merged.withColumn("DIFF", when(col(self.ctp_params["columns-transaction"][0]).isNotNull(),
lit(0)).otherwise(lit(None)))
total_merged = total_merged.select(*pivot_df2.columns, "DIFF")
condition = [total_merged[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = total_merged.join(ctp_df2, condition)
merged = merged.withColumn("DIFF", when(col("DIFF").isNull(),
total_merged[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]]).otherwise(col("DIFF")))
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot:
df_pandas = pivot_df.toPandas()
df = pivot_df
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
df_pandas = ctp_df.toPandas()
df = ctp_df
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
custom_udf = udf(lambda diff: custom_apply(diff, df_pandas, group_cols, amount_col, id_col, timeout,
max_combinations), ArrayType(IntegerType()))
merged_df = merged_df.withColumn("LISTA_DIFF", custom_udf(merged_df["DIFF"]))
merged_df = merged_df.filter((col("DIFF") == 0) | ((col("DIFF") != 0) & (size(col("LISTA_DIFF")) > 0)))
total_tmp_cols = group_cols + ["DIFF"]
df3 = df.join(merged_df.select(*total_tmp_cols), group_cols)
if self.exclude_pivot:
merged_df = merged_df.withColumn("INTER_PIVOT_ID", array_except(merged_df[self.pivot_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df = merged_df.withColumnRenamed(self.ctp_params["id-column"], "INTER_CTP_ID")
if merged_df.schema["INTER_CTP_ID"].dataType == LongType():
merged_df = merged_df.withColumn("INTER_CTP_ID", array(col("INTER_CTP_ID")).cast(ArrayType(LongType())))
df3 = df3.toPandas()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
resultado = df3[total_cols].groupby(group_cols).p_apply(lambda x: custom_func(x, amount_col, id_col, comb_per_group, max_combinations))
resultado = resultado.reset_index()
if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else:
merged_df = merged_df.withColumn("INTER_CTP_ID", array_except(merged_df[self.ctp_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df = merged_df.withColumnRenamed(self.pivot_params["id-column"], "INTER_PIVOT_ID")
if merged_df.schema["INTER_PIVOT_ID"].dataType == LongType():
merged_df = merged_df.withColumn("INTER_PIVOT_ID", array(col("INTER_PIVOT_ID")).cast(ArrayType(LongType())))
self.output = merged_df
resultado.columns = group_cols + ["LISTA_DIFF"]
meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols)
meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array)
meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))]
if meged2.empty:
pass
elif self.exclude_pivot:
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"})
if meged2['INTER_CTP_ID'].dtype == 'int64':
merged_df['INTER_CTP_ID'] = merged_df['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else:
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"})
if meged2['INTER_PIVOT_ID'].dtype == 'int64':
merged_df['INTER_PIVOT_ID'] = merged_df['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object')
self.output = meged2
def response(self):
return self.output
def handle_array(self, x):
if isinstance(x, List):
return x
else:
return []
def array_except(self, arr1, arr2):
if arr2 is None:
return arr1
else:
return [item for item in arr1 if item not in arr2]
def createSession(self, name: str = "app_engine_spark"):
try:
from pyspark.sql import SparkSession
......@@ -210,25 +245,16 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise Exception(f"Error creando sesion Spark. {e}")
def custom_apply(raw, df_pandas, group_cols, amount_col, id_col, timeout, max_combinations):
if raw == 0:
return []
else:
total_cols = group_cols.copy()
total_cols.append(amount_col)
total_cols.append(id_col)
total_cols.append(EXCLUDE_ROWS_FIELD)
result = df_pandas[total_cols].groupby(group_cols).apply(custom_func, diff=raw, amount_field=amount_col, id_field=id_col,
timeout=timeout, max_combinations=max_combinations).values
resp = ",".join(map(str, result)) if len(result) > 0 else ""
return [int(elem) for elem in resp[1:-1].split(",")] if resp != "" else []
def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
def custom_func(group, amount_field, id_field, max_comb_per_group, max_combinations):
diff = group["DIFF"].values[0]
if pd.isna(diff) or diff == 0:
return None
group = group[group[EXCLUDE_ROWS_FIELD] == 'S']
group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True)
values = group[amount_field].values
indexs = group.index.values
np.random.shuffle(indexs)
tam = len(values)
rang = range(1, tam + 1) if tam <= max_combinations else range(1, max_combinations + 1)
......@@ -237,38 +263,30 @@ def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
def buscar_combinacion(i):
nonlocal final, stop_event
if not stop_event: # Solo continuar si no se ha encontrado una combinación aún
for comb in combinations(indexs, i):
if stop_event: # Verificar si se ha establecido la señal de detención
if not stop_event:
for index, comb in enumerate(combinations(indexs, i)):
if stop_event or index > max_comb_per_group:
break
elif np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff:
final = group.loc[list(comb), id_field].tolist()
stop_event = True
break
if np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff:
# Validamos si la combinación son de registros validados 'EXCLUDE_VALID'
final = group[group.index.isin(comb)]
if (final[EXCLUDE_ROWS_FIELD] == 'S').all():
final = final[id_field].tolist()
stop_event = True # Establecer la señal de detención
break
return None
start_time = time.time()
with ThreadPoolExecutor() as executor:
futures = [executor.submit(buscar_combinacion, i) for i in rang]
for future in futures:
try:
future.result(timeout=timeout)
future.result()
except TimeoutError:
stop_event = True # Establecer la señal de detención si se alcanza el tiempo límite
stop_event = True
break
if stop_event or final is not None:
break
if time.time() - start_time >= timeout:
return None # Devolver None si se alcanzó el tiempo límite
return final
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment