Commit dd21c192 authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1

parent 85fe0d9e
......@@ -2,7 +2,7 @@ from enum import Enum
class CodeResponseEnum(Enum):
SUCCESS = 200
OK = 200
MAX_EXECUTION_ERROR = 601
BD_INPUT_ERROR = 602
SCRIPT_ERROR = 603
......
......@@ -2,5 +2,5 @@ from enum import Enum
class StatusEnum(Enum):
SUCCESS = 200
OK = 200
ERROR = 609
......@@ -20,7 +20,7 @@ class EngineService:
self.app.logger.info(f"Ejecuciones simultáneas actualmente: {self.executions}")
if self.executions > self.max_executions:
self.app.logger.info(f"Máxima de ejecuciones en paralelo alcanzado. Máximo: {self.max_executions}")
response = {'status': StatusEnum.ERROR.name.lower(),
response = {'status': StatusEnum.ERROR.name,
'message': DescResponseEnum.MAX_EXECUTION_ERROR.value}
else:
process = Process(self.app, self.descriptor)
......
......@@ -20,7 +20,7 @@ class Process:
self.utils = Utils(app)
def run(self) -> Dict[str, Any]:
status, status_description = StatusEnum.SUCCESS, ""
status, status_description = StatusEnum.OK, ""
try:
# Obteniendo la conexión a la BD
db_params = cfg.db_params
......@@ -45,10 +45,14 @@ class Process:
obj_script.parser(self.descriptor)
# Iniciando process
self.app.logger.info(f"Iniciando procesamiento de script")
obj_script.process(source)
# Guardando resultado
self.app.logger.info(f"Generado y guardando resultado")
response = obj_script.response()
# response.show()
result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
......
......@@ -49,11 +49,11 @@ class Utils:
def create_response(self, codeEnum: Enum, detail: str = "") -> Dict[str, Any]:
response = {"statusCode": codeEnum.value}
if codeEnum.value == StatusEnum.SUCCESS.value:
response.update({'status': StatusEnum.SUCCESS.name.lower(), 'path': detail})
if codeEnum.value == StatusEnum.OK.value:
response.update({'status': StatusEnum.OK.name, 'detail': detail})
else:
description = DescResponseEnum[codeEnum.name].value
response.update({'status': StatusEnum.ERROR.name.lower(), 'message': description,
response.update({'status': StatusEnum.ERROR.name, 'message': description,
'detail': detail})
return response
......@@ -73,10 +73,10 @@ class Utils:
used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match
if data.count() == 0:
if data.empty:
self.app.logger.info(f"El dataframe resultado esta vacio")
else:
for i in data.collect():
for idx, i in data.iterrows():
input_data = {}
key_transaction = None
key_group_pivot = None
......@@ -104,7 +104,7 @@ class Utils:
input_data["exclude-ids"] = str(i[FixedFieldsEnum.LISTA_DIFF.value])
input_data["difference-amount"] = str(i[FixedFieldsEnum.DIFF.value])
result.append(input_data)
response['status'] = StatusEnum.SUCCESS.value
response['status'] = StatusEnum.OK.value
response["detail"] = result
except Exception as e:
self.app.logger.error(f"Error al crear el diccionario de resultados. {e}")
......@@ -130,7 +130,7 @@ class Utils:
)
session.add(result_obj)
session.commit()
response['status'] = StatusEnum.SUCCESS.name
response['status'] = StatusEnum.OK.name
except Exception as e:
session.rollback()
response["status"] = StatusEnum.ERROR.name
......
from typing import Any, Dict
from typing import Any, Dict, List
import importlib.util
from itertools import combinations
import time
import numpy as np
import pandas as pd
import dask.dataframe as dd
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
# import multiprocessing as mp
from functools import partial
# import asyncio
from app.main.engine.action.ActionInterface import ActionInterface
......@@ -14,8 +20,8 @@ relation_classname_identifier = {
# CONFIGURACION DE SESION DE SPARK
MASTER = "local[*]"
DRIVER_MEMORY = "4g"
EXECUTOR_MEMORY = "4g"
DRIVER_MEMORY = "8g"
EXECUTOR_MEMORY = "8g"
MYSQL_JAR_PATH = "jars/mysql-connector-java-8.0.30.jar"
# EXCLUDE VALIDATION FIELD
......@@ -111,8 +117,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round, udf, array_except, size, col, array
from pyspark.sql.types import ArrayType, IntegerType, LongType
from pyspark.sql.functions import sum, collect_list, round
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
......@@ -161,38 +166,63 @@ class MatchAndExcludeRecordsAction(ActionInterface):
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot:
df_pandas = pivot_df.toPandas()
df = pivot_df
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
df_pandas = ctp_df.toPandas()
df = ctp_df
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
custom_udf = udf(lambda diff: custom_apply(diff, df_pandas, group_cols, amount_col, id_col, timeout,
max_combinations), ArrayType(IntegerType()))
merged_df = merged_df.withColumn("LISTA_DIFF", custom_udf(merged_df["DIFF"]))
merged_df = merged_df.filter((col("DIFF") == 0) | ((col("DIFF") != 0) & (size(col("LISTA_DIFF")) > 0)))
total_tmp_cols = group_cols + ["DIFF"]
df3 = df.join(merged_df.select(*total_tmp_cols), group_cols)
if self.exclude_pivot:
merged_df = merged_df.withColumn("INTER_PIVOT_ID", array_except(merged_df[self.pivot_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df = merged_df.withColumnRenamed(self.ctp_params["id-column"], "INTER_CTP_ID")
if merged_df.schema["INTER_CTP_ID"].dataType == LongType():
merged_df = merged_df.withColumn("INTER_CTP_ID", array(col("INTER_CTP_ID")).cast(ArrayType(LongType())))
df3 = df3.toPandas()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
num_chunks = 4
resultado = df3[total_cols].groupby(group_cols).apply(lambda x: custom_func(x, amount_col, id_col, timeout, max_combinations))
resultado = resultado.reset_index()
if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else:
resultado.columns = group_cols + ["LISTA_DIFF"]
meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols)
meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array)
meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))]
if meged2.empty:
pass
elif self.exclude_pivot:
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"})
if meged2['INTER_CTP_ID'].dtype == 'int64':
merged_df['INTER_CTP_ID'] = merged_df['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else:
merged_df = merged_df.withColumn("INTER_CTP_ID", array_except(merged_df[self.ctp_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df = merged_df.withColumnRenamed(self.pivot_params["id-column"], "INTER_PIVOT_ID")
if merged_df.schema["INTER_PIVOT_ID"].dataType == LongType():
merged_df = merged_df.withColumn("INTER_PIVOT_ID", array(col("INTER_PIVOT_ID")).cast(ArrayType(LongType())))
self.output = merged_df
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"})
if meged2['INTER_PIVOT_ID'].dtype == 'int64':
merged_df['INTER_PIVOT_ID'] = merged_df['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object')
self.output = meged2
def response(self):
return self.output
def handle_array(self, x):
if isinstance(x, List):
return x
else:
return []
def array_except(self, arr1, arr2):
if arr2 is None:
return arr1
else:
return [item for item in arr1 if item not in arr2]
def createSession(self, name: str = "app_engine_spark"):
try:
from pyspark.sql import SparkSession
......@@ -210,21 +240,10 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise Exception(f"Error creando sesion Spark. {e}")
def custom_apply(raw, df_pandas, group_cols, amount_col, id_col, timeout, max_combinations):
if raw == 0:
return []
else:
total_cols = group_cols.copy()
total_cols.append(amount_col)
total_cols.append(id_col)
total_cols.append(EXCLUDE_ROWS_FIELD)
result = df_pandas[total_cols].groupby(group_cols).apply(custom_func, diff=raw, amount_field=amount_col, id_field=id_col,
timeout=timeout, max_combinations=max_combinations).values
resp = ",".join(map(str, result)) if len(result) > 0 else ""
return [int(elem) for elem in resp[1:-1].split(",")] if resp != "" else []
def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
def custom_func(group, amount_field, id_field, timeout, max_combinations):
diff = group["DIFF"].values[0]
if diff == 0:
return None
group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True)
values = group[amount_field].values
......@@ -242,11 +261,11 @@ def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
for comb in combinations(indexs, i):
if stop_event: # Verificar si se ha establecido la señal de detención
break
if np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff:
if np.isclose(np.sum(values[list(comb)]), diff):
# Validamos si la combinación son de registros validados 'EXCLUDE_VALID'
final = group[group.index.isin(comb)]
if (final[EXCLUDE_ROWS_FIELD] == 'S').all():
final = final[id_field].tolist()
result = group[group.index.isin(comb)]
if (result[EXCLUDE_ROWS_FIELD] == 'S').all():
final = result[id_field].tolist()
stop_event = True # Establecer la señal de detención
break
......@@ -259,7 +278,7 @@ def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
for future in futures:
try:
future.result(timeout=timeout)
future.result()
except TimeoutError:
stop_event = True # Establecer la señal de detención si se alcanza el tiempo límite
break
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment