Commit 8917d564 authored by Erly Villaroel's avatar Erly Villaroel

Merge remote-tracking branch 'origin/developer_ca' into developer_ev

# Conflicts:
#	app/main/engine/util/Utils.py
parents abfe2284 dd21c192
...@@ -2,7 +2,7 @@ from enum import Enum ...@@ -2,7 +2,7 @@ from enum import Enum
class CodeResponseEnum(Enum): class CodeResponseEnum(Enum):
SUCCESS = 200 OK = 200
MAX_EXECUTION_ERROR = 601 MAX_EXECUTION_ERROR = 601
BD_INPUT_ERROR = 602 BD_INPUT_ERROR = 602
SCRIPT_ERROR = 603 SCRIPT_ERROR = 603
......
...@@ -2,5 +2,5 @@ from enum import Enum ...@@ -2,5 +2,5 @@ from enum import Enum
class StatusEnum(Enum): class StatusEnum(Enum):
SUCCESS = 200 OK = 200
ERROR = 609 ERROR = 609
...@@ -20,7 +20,7 @@ class EngineService: ...@@ -20,7 +20,7 @@ class EngineService:
self.app.logger.info(f"Ejecuciones simultáneas actualmente: {self.executions}") self.app.logger.info(f"Ejecuciones simultáneas actualmente: {self.executions}")
if self.executions > self.max_executions: if self.executions > self.max_executions:
self.app.logger.info(f"Máxima de ejecuciones en paralelo alcanzado. Máximo: {self.max_executions}") self.app.logger.info(f"Máxima de ejecuciones en paralelo alcanzado. Máximo: {self.max_executions}")
response = {'status': StatusEnum.ERROR.name.lower(), response = {'status': StatusEnum.ERROR.name,
'message': DescResponseEnum.MAX_EXECUTION_ERROR.value} 'message': DescResponseEnum.MAX_EXECUTION_ERROR.value}
else: else:
process = Process(self.app, self.descriptor) process = Process(self.app, self.descriptor)
......
...@@ -20,7 +20,7 @@ class Process: ...@@ -20,7 +20,7 @@ class Process:
self.utils = Utils(app) self.utils = Utils(app)
def run(self) -> Dict[str, Any]: def run(self) -> Dict[str, Any]:
status, status_description = StatusEnum.SUCCESS, "" status, status_description = StatusEnum.OK, ""
try: try:
# Obteniendo la conexión a la BD # Obteniendo la conexión a la BD
db_params = cfg.db_params db_params = cfg.db_params
...@@ -45,10 +45,14 @@ class Process: ...@@ -45,10 +45,14 @@ class Process:
obj_script.parser(self.descriptor) obj_script.parser(self.descriptor)
# Iniciando process
self.app.logger.info(f"Iniciando procesamiento de script")
obj_script.process(source) obj_script.process(source)
# Guardando resultado
self.app.logger.info(f"Generado y guardando resultado")
response = obj_script.response() response = obj_script.response()
# response.show()
result = self.utils.create_result(response, self.descriptor) result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session) save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name: if save["status"] == StatusEnum.ERROR.name:
......
...@@ -49,11 +49,11 @@ class Utils: ...@@ -49,11 +49,11 @@ class Utils:
def create_response(self, codeEnum: Enum, detail: str = "") -> Dict[str, Any]: def create_response(self, codeEnum: Enum, detail: str = "") -> Dict[str, Any]:
response = {"statusCode": codeEnum.value} response = {"statusCode": codeEnum.value}
if codeEnum.value == StatusEnum.SUCCESS.value: if codeEnum.value == StatusEnum.OK.value:
response.update({'status': StatusEnum.SUCCESS.name.lower(), 'path': detail}) response.update({'status': StatusEnum.OK.name, 'detail': detail})
else: else:
description = DescResponseEnum[codeEnum.name].value description = DescResponseEnum[codeEnum.name].value
response.update({'status': StatusEnum.ERROR.name.lower(), 'message': description, response.update({'status': StatusEnum.ERROR.name, 'message': description,
'detail': detail}) 'detail': detail})
return response return response
...@@ -76,7 +76,7 @@ class Utils: ...@@ -76,7 +76,7 @@ class Utils:
if data.empty: if data.empty:
self.app.logger.info(f"El dataframe resultado esta vacio") self.app.logger.info(f"El dataframe resultado esta vacio")
else: else:
for idx,i in data.iterrows(): for idx, i in data.iterrows():
input_data = {} input_data = {}
key_transaction = None key_transaction = None
key_group_pivot = None key_group_pivot = None
...@@ -104,7 +104,7 @@ class Utils: ...@@ -104,7 +104,7 @@ class Utils:
input_data["exclude-ids"] = str(i[FixedFieldsEnum.LISTA_DIFF.value]) input_data["exclude-ids"] = str(i[FixedFieldsEnum.LISTA_DIFF.value])
input_data["difference-amount"] = str(i[FixedFieldsEnum.DIFF.value]) input_data["difference-amount"] = str(i[FixedFieldsEnum.DIFF.value])
result.append(input_data) result.append(input_data)
response['status'] = StatusEnum.SUCCESS.value response['status'] = StatusEnum.OK.value
response["detail"] = result response["detail"] = result
except Exception as e: except Exception as e:
self.app.logger.error(f"Error al crear el diccionario de resultados. {e}") self.app.logger.error(f"Error al crear el diccionario de resultados. {e}")
...@@ -130,7 +130,7 @@ class Utils: ...@@ -130,7 +130,7 @@ class Utils:
) )
session.add(result_obj) session.add(result_obj)
session.commit() session.commit()
response['status'] = StatusEnum.SUCCESS.name response['status'] = StatusEnum.OK.name
except Exception as e: except Exception as e:
session.rollback() session.rollback()
response["status"] = StatusEnum.ERROR.name response["status"] = StatusEnum.ERROR.name
......
from typing import Any, Dict from typing import Any, Dict, List
import importlib.util import importlib.util
from itertools import combinations from itertools import combinations
import time import time
import numpy as np import numpy as np
import pandas as pd
import dask.dataframe as dd
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
# import multiprocessing as mp
from functools import partial
# import asyncio
from app.main.engine.action.ActionInterface import ActionInterface from app.main.engine.action.ActionInterface import ActionInterface
...@@ -14,8 +20,8 @@ relation_classname_identifier = { ...@@ -14,8 +20,8 @@ relation_classname_identifier = {
# CONFIGURACION DE SESION DE SPARK # CONFIGURACION DE SESION DE SPARK
MASTER = "local[*]" MASTER = "local[*]"
DRIVER_MEMORY = "4g" DRIVER_MEMORY = "8g"
EXECUTOR_MEMORY = "4g" EXECUTOR_MEMORY = "8g"
MYSQL_JAR_PATH = "jars/mysql-connector-java-8.0.30.jar" MYSQL_JAR_PATH = "jars/mysql-connector-java-8.0.30.jar"
# EXCLUDE VALIDATION FIELD # EXCLUDE VALIDATION FIELD
...@@ -111,8 +117,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -111,8 +117,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]] self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]] self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round, udf, array_except, size, col, array from pyspark.sql.functions import sum, collect_list, round
from pyspark.sql.types import ArrayType, IntegerType, LongType
pivot_cols = self.pivot_params["columns-transaction"].copy() pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols: if self.pivot_params["amount-column"] in pivot_cols:
...@@ -161,38 +166,63 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -161,38 +166,63 @@ class MatchAndExcludeRecordsAction(ActionInterface):
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL)) merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot: if self.exclude_pivot:
df_pandas = pivot_df.toPandas() df = pivot_df
group_cols = self.pivot_params["columns-group"] group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"] amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"] id_col = self.pivot_params["id-column"]
else: else:
df_pandas = ctp_df.toPandas() df = ctp_df
group_cols = self.ctp_params["columns-group"] group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"] amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"] id_col = self.ctp_params["id-column"]
custom_udf = udf(lambda diff: custom_apply(diff, df_pandas, group_cols, amount_col, id_col, timeout, total_tmp_cols = group_cols + ["DIFF"]
max_combinations), ArrayType(IntegerType())) df3 = df.join(merged_df.select(*total_tmp_cols), group_cols)
merged_df = merged_df.withColumn("LISTA_DIFF", custom_udf(merged_df["DIFF"]))
merged_df = merged_df.filter((col("DIFF") == 0) | ((col("DIFF") != 0) & (size(col("LISTA_DIFF")) > 0)))
if self.exclude_pivot: df3 = df3.toPandas()
merged_df = merged_df.withColumn("INTER_PIVOT_ID", array_except(merged_df[self.pivot_params["id-column"]], total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
merged_df["LISTA_DIFF"])) num_chunks = 4
merged_df = merged_df.withColumnRenamed(self.ctp_params["id-column"], "INTER_CTP_ID")
if merged_df.schema["INTER_CTP_ID"].dataType == LongType(): resultado = df3[total_cols].groupby(group_cols).apply(lambda x: custom_func(x, amount_col, id_col, timeout, max_combinations))
merged_df = merged_df.withColumn("INTER_CTP_ID", array(col("INTER_CTP_ID")).cast(ArrayType(LongType()))) resultado = resultado.reset_index()
if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else:
resultado.columns = group_cols + ["LISTA_DIFF"]
meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols)
meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array)
meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))]
if meged2.empty:
pass
elif self.exclude_pivot:
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"})
if meged2['INTER_CTP_ID'].dtype == 'int64':
merged_df['INTER_CTP_ID'] = merged_df['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else: else:
merged_df = merged_df.withColumn("INTER_CTP_ID", array_except(merged_df[self.ctp_params["id-column"]], meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1)
merged_df["LISTA_DIFF"])) meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"})
merged_df = merged_df.withColumnRenamed(self.pivot_params["id-column"], "INTER_PIVOT_ID") if meged2['INTER_PIVOT_ID'].dtype == 'int64':
if merged_df.schema["INTER_PIVOT_ID"].dataType == LongType(): merged_df['INTER_PIVOT_ID'] = merged_df['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object')
merged_df = merged_df.withColumn("INTER_PIVOT_ID", array(col("INTER_PIVOT_ID")).cast(ArrayType(LongType())))
self.output = merged_df self.output = meged2
def response(self): def response(self):
return self.output return self.output
def handle_array(self, x):
if isinstance(x, List):
return x
else:
return []
def array_except(self, arr1, arr2):
if arr2 is None:
return arr1
else:
return [item for item in arr1 if item not in arr2]
def createSession(self, name: str = "app_engine_spark"): def createSession(self, name: str = "app_engine_spark"):
try: try:
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
...@@ -210,21 +240,10 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -210,21 +240,10 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise Exception(f"Error creando sesion Spark. {e}") raise Exception(f"Error creando sesion Spark. {e}")
def custom_apply(raw, df_pandas, group_cols, amount_col, id_col, timeout, max_combinations): def custom_func(group, amount_field, id_field, timeout, max_combinations):
if raw == 0: diff = group["DIFF"].values[0]
return [] if diff == 0:
else: return None
total_cols = group_cols.copy()
total_cols.append(amount_col)
total_cols.append(id_col)
total_cols.append(EXCLUDE_ROWS_FIELD)
result = df_pandas[total_cols].groupby(group_cols).apply(custom_func, diff=raw, amount_field=amount_col, id_field=id_col,
timeout=timeout, max_combinations=max_combinations).values
resp = ",".join(map(str, result)) if len(result) > 0 else ""
return [int(elem) for elem in resp[1:-1].split(",")] if resp != "" else []
def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
group[amount_field] = group[amount_field].astype(float) group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True) group = group.reset_index(drop=True)
values = group[amount_field].values values = group[amount_field].values
...@@ -242,11 +261,11 @@ def custom_func(group, diff, amount_field, id_field, timeout, max_combinations): ...@@ -242,11 +261,11 @@ def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
for comb in combinations(indexs, i): for comb in combinations(indexs, i):
if stop_event: # Verificar si se ha establecido la señal de detención if stop_event: # Verificar si se ha establecido la señal de detención
break break
if np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff: if np.isclose(np.sum(values[list(comb)]), diff):
# Validamos si la combinación son de registros validados 'EXCLUDE_VALID' # Validamos si la combinación son de registros validados 'EXCLUDE_VALID'
final = group[group.index.isin(comb)] result = group[group.index.isin(comb)]
if (final[EXCLUDE_ROWS_FIELD] == 'S').all(): if (result[EXCLUDE_ROWS_FIELD] == 'S').all():
final = final[id_field].tolist() final = result[id_field].tolist()
stop_event = True # Establecer la señal de detención stop_event = True # Establecer la señal de detención
break break
...@@ -259,7 +278,7 @@ def custom_func(group, diff, amount_field, id_field, timeout, max_combinations): ...@@ -259,7 +278,7 @@ def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
for future in futures: for future in futures:
try: try:
future.result(timeout=timeout) future.result()
except TimeoutError: except TimeoutError:
stop_event = True # Establecer la señal de detención si se alcanza el tiempo límite stop_event = True # Establecer la señal de detención si se alcanza el tiempo límite
break break
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment