Commit 76fb4c10 authored by Erly Villaroel's avatar Erly Villaroel

Merge remote-tracking branch 'origin/developer_ca' into developer_ev

# Conflicts:
#	app/main/engine/enum/CodeResponseEnum.py
#	app/main/engine/service/Process.py
#	scripts/match-and-exclude-records-actions_v1.py
parents 016c0749 c1597525
...@@ -44,6 +44,15 @@ class Database: ...@@ -44,6 +44,15 @@ class Database:
except Exception as e: except Exception as e:
self.app.logger.error(f"Error cerrando básica conexión. {e}") self.app.logger.error(f"Error cerrando básica conexión. {e}")
def get_dialect(self) -> str:
dialect = ""
try:
dialect = self.factory.get_dialect()
except Exception as e:
self.app.logger.error(f"Error obteniendo dialect. {e}")
finally:
return dialect
def create_engine(self) -> None: def create_engine(self) -> None:
try: try:
if isinstance(self.engine, type(None)): if isinstance(self.engine, type(None)):
......
...@@ -23,6 +23,7 @@ class Mysql: ...@@ -23,6 +23,7 @@ class Mysql:
self.params = params self.params = params
self.engine = None self.engine = None
self.connection = None self.connection = None
self.dialect = None
def create_spark_connection(self): def create_spark_connection(self):
params = {} params = {}
...@@ -46,10 +47,17 @@ class Mysql: ...@@ -46,10 +47,17 @@ class Mysql:
finally: finally:
return self.connection return self.connection
def create_engine(self) -> None: def get_dialect(self) -> str:
try: try:
dialect = DatabaseDialectEnum.MYSQL.value dialect = DatabaseDialectEnum.MYSQL.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4" self.dialect = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4"
except Exception as e:
self.app.logger.error(f"Error obteniendo dialect de Mysql. {e}")
return self.dialect
def create_engine(self) -> None:
try:
url = self.get_dialect()
self.engine = create_engine(url, pool_recycle=3600, pool_pre_ping=True, **self.params) self.engine = create_engine(url, pool_recycle=3600, pool_pre_ping=True, **self.params)
except Exception as e: except Exception as e:
self.app.logger.error(f"Error creando engine de Mysql. {e}") self.app.logger.error(f"Error creando engine de Mysql. {e}")
......
...@@ -11,4 +11,4 @@ class CodeResponseEnum(Enum): ...@@ -11,4 +11,4 @@ class CodeResponseEnum(Enum):
OUTPUT_ERROR = 606 OUTPUT_ERROR = 606
EMPTY_DATASET = 607 EMPTY_DATASET = 607
ERROR = 609 ERROR = 609
TIMEOUT_ERROR = 800 TIMEOUT = 610
...@@ -4,3 +4,4 @@ from enum import Enum ...@@ -4,3 +4,4 @@ from enum import Enum
class StatusEnum(Enum): class StatusEnum(Enum):
OK = 200 OK = 200
ERROR = 609 ERROR = 609
TIMEOUT = 610
...@@ -2,14 +2,15 @@ from typing import Dict, Any ...@@ -2,14 +2,15 @@ from typing import Dict, Any
import time import time
import traceback as traceback_lib import traceback as traceback_lib
import importlib import importlib
from threading import Timer
from config import Config as cfg from config import Config as cfg
from app.main.engine.util.Timezone import Timezone from app.main.engine.util.Timezone import Timezone
from app.main.engine.util.Utils import Utils from app.main.engine.util.Utils import Utils
from app.main.engine.enum.StatusEnum import StatusEnum from app.main.engine.enum.StatusEnum import StatusEnum
from app.main.engine.enum.CodeResponseEnum import CodeResponseEnum from app.main.engine.enum.CodeResponseEnum import CodeResponseEnum
from app.main.engine.database.Database import Database from app.main.engine.database.Database import Database
from wrapt_timeout_decorator import *
class Process: class Process:
def __init__(self, app, descriptor: Dict[str, Any]) -> None: def __init__(self, app, descriptor: Dict[str, Any]) -> None:
self.app = app self.app = app
...@@ -21,7 +22,6 @@ class Process: ...@@ -21,7 +22,6 @@ class Process:
def run(self) -> Dict[str, Any]: def run(self) -> Dict[str, Any]:
status, status_description = StatusEnum.OK, "" status, status_description = StatusEnum.OK, ""
try: try:
# Obteniendo la conexión a la BD # Obteniendo la conexión a la BD
db_params = cfg.db_params db_params = cfg.db_params
source = Database(self.app, db_params) source = Database(self.app, db_params)
...@@ -44,39 +44,22 @@ class Process: ...@@ -44,39 +44,22 @@ class Process:
obj_script = globals()[relation](self.app) obj_script = globals()[relation](self.app)
obj_script.parser(self.descriptor) obj_script.parser(self.descriptor)
tiempo_limite = obj_script.timeout
if tiempo_limite is not None:
@timeout(tiempo_limite)
def procesamiento():
try:
self.app.logger.info(f"Iniciando procesamiento de script")
obj_script.process(source)
# Guardando resultado # Iniciando process
self.app.logger.info(f"Generado y guardando resultado") self.app.logger.info(f"Iniciando procesamiento de script")
response = obj_script.response() obj_script.process(source)
# response.show()
result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"])
except Exception as e: # Guardando resultado
raise TimeoutError(f"Tiempo límite de ejecución superado{e}") self.app.logger.info(f"Generado y guardando resultado")
procesamiento() response = obj_script.response()
# response.show()
else: result = self.utils.create_result(response, self.descriptor)
# Iniciando process save = self.utils.save_result(result, self.descriptor, db_session)
self.app.logger.info(f"Iniciando procesamiento de script") if save["status"] == StatusEnum.ERROR.name:
obj_script.process(source) raise InterruptedError(save["message"])
# Guardando resultado except TimeoutError as e:
self.app.logger.info(f"Generado y guardando resultado") self.app.logger.error(f"Error de Timeout. Error: {e}")
response = obj_script.response() status, status_description = CodeResponseEnum.TIMEOUT, str(e)
# response.show()
result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"])
except IndexError as e: except IndexError as e:
self.app.logger.error(f"Error extrayendo insumos. Vacío. Error: {e}") self.app.logger.error(f"Error extrayendo insumos. Vacío. Error: {e}")
status, status_description = CodeResponseEnum.EMPTY_DATASET, str(e) status, status_description = CodeResponseEnum.EMPTY_DATASET, str(e)
...@@ -95,9 +78,6 @@ class Process: ...@@ -95,9 +78,6 @@ class Process:
except ReferenceError as e: except ReferenceError as e:
self.app.logger.error(f"Error validando parámetros del descriptor. {e}") self.app.logger.error(f"Error validando parámetros del descriptor. {e}")
status, status_description = CodeResponseEnum.PARAMETERS_ERROR, str(e) status, status_description = CodeResponseEnum.PARAMETERS_ERROR, str(e)
except TimeoutError as e:
self.app.logger.error(f"Error validando parámetros del descriptor. {e}")
status, status_description = CodeResponseEnum.TIMEOUT_ERROR, str(e)
except Exception as e: except Exception as e:
traceback_lib.print_exc() traceback_lib.print_exc()
self.app.logger.error(f"Error procesando engine. {e}") self.app.logger.error(f"Error procesando engine. {e}")
......
...@@ -5,6 +5,8 @@ import shutil ...@@ -5,6 +5,8 @@ import shutil
from enum import Enum from enum import Enum
# from pyspark.sql import SparkSession # from pyspark.sql import SparkSession
import json import json
from app.main.engine.enum.CodeResponseEnum import CodeResponseEnum
from app.main.engine.util.Timezone import Timezone from app.main.engine.util.Timezone import Timezone
# from config import Config as cfg # from config import Config as cfg
...@@ -52,8 +54,11 @@ class Utils: ...@@ -52,8 +54,11 @@ class Utils:
if codeEnum.value == StatusEnum.OK.value: if codeEnum.value == StatusEnum.OK.value:
response.update({'status': StatusEnum.OK.name, 'detail': detail}) response.update({'status': StatusEnum.OK.name, 'detail': detail})
else: else:
error = StatusEnum.ERROR.name
if codeEnum.value == CodeResponseEnum.TIMEOUT.value:
error = StatusEnum.TIMEOUT.name
description = DescResponseEnum[codeEnum.name].value description = DescResponseEnum[codeEnum.name].value
response.update({'status': StatusEnum.ERROR.name, 'message': description, response.update({'status': error, 'message': description,
'detail': detail}) 'detail': detail})
return response return response
...@@ -65,6 +70,14 @@ class Utils: ...@@ -65,6 +70,14 @@ class Utils:
pivot_params = descriptor["params-input"]["pivot-config"] pivot_params = descriptor["params-input"]["pivot-config"]
ctp_params = descriptor["params-input"]["counterpart-config"] ctp_params = descriptor["params-input"]["counterpart-config"]
for key_p, key_c in zip(pivot_params.keys(), ctp_params.keys()):
if isinstance(pivot_params[key_p], str):
pivot_params[key_p] = "PIVOT_" + pivot_params[key_p]
ctp_params[key_c] = "COUNTERPART_" + ctp_params[key_c]
else:
pivot_params[key_p] = ["PIVOT_" + column for column in pivot_params[key_p]]
ctp_params[key_c] = ["COUNTERPART_" + column for column in ctp_params[key_c]]
group_pivot_match = pivot_params["columns-group"] group_pivot_match = pivot_params["columns-group"]
transaction_pivot_match = pivot_params["columns-transaction"] transaction_pivot_match = pivot_params["columns-transaction"]
...@@ -73,7 +86,7 @@ class Utils: ...@@ -73,7 +86,7 @@ class Utils:
used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match
if data.empty: if data is None or data.empty:
self.app.logger.info(f"El dataframe resultado esta vacio") self.app.logger.info(f"El dataframe resultado esta vacio")
else: else:
for idx, i in data.iterrows(): for idx, i in data.iterrows():
......
...@@ -23,16 +23,16 @@ app: ...@@ -23,16 +23,16 @@ app:
timezone: 'GMT-5' timezone: 'GMT-5'
time_pattern: '%Y-%m-%d %H:%M:%S' time_pattern: '%Y-%m-%d %H:%M:%S'
logging: 'INFO' logging: 'INFO'
max_engine_threads: 2 # threads (maximum) max_engine_threads: 50 # threads (maximum)
# Make the service in a production state # Make the service in a production state
# Manage connections to the REST Service published. Allow workers to receive the connections. # Manage connections to the REST Service published. Allow workers to receive the connections.
# https://docs.gunicorn.org/en/stable/ # https://docs.gunicorn.org/en/stable/
gunicorn: gunicorn:
bind: '0.0.0.0:7500' bind: '0.0.0.0:8000'
worker_class: 'gthread' worker_class: 'gthread'
threads: 8 threads: 51
worker_connections: 50 worker_connections: 51
loglevel: 'debug' loglevel: 'debug'
accesslog: '-' accesslog: '-'
capture_output: True capture_output: True
\ No newline at end of file
from typing import Any, Dict, List from typing import Any, Dict, List
import importlib.util import importlib.util
from itertools import combinations
import multiprocessing as mp
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from numba import njit import json
from parallel_pandas import ParallelPandas from dask import dataframe as dd
from concurrent.futures import ThreadPoolExecutor from numba import jit, types, typed
from wrapt_timeout_decorator import timeout
from app.main.engine.action.ActionInterface import ActionInterface from app.main.engine.action.ActionInterface import ActionInterface
...@@ -15,12 +14,6 @@ relation_classname_identifier = { ...@@ -15,12 +14,6 @@ relation_classname_identifier = {
"match-and-exclude-records-actions": "MatchAndExcludeRecordsAction" "match-and-exclude-records-actions": "MatchAndExcludeRecordsAction"
} }
# CONFIGURACION DE SESION DE SPARK
MASTER = "local[*]"
DRIVER_MEMORY = "8g"
EXECUTOR_MEMORY = "8g"
MYSQL_JAR_PATH = "jars/mysql-connector-java-8.0.30.jar"
# EXCLUDE VALIDATION FIELD # EXCLUDE VALIDATION FIELD
EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID" EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID"
# REDONDEO DE DECIMALES # REDONDEO DE DECIMALES
...@@ -36,7 +29,6 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -36,7 +29,6 @@ class MatchAndExcludeRecordsAction(ActionInterface):
super().__init__(app) super().__init__(app)
self.max_combinations = None self.max_combinations = None
self.timeout = None self.timeout = None
self.comb_per_group = None
self.exclude_pivot = None self.exclude_pivot = None
self.pivot_params = None self.pivot_params = None
self.ctp_params = None self.ctp_params = None
...@@ -77,267 +69,257 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -77,267 +69,257 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte") raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.max_combinations = configs["max-records-per-combinations"] self.max_combinations = configs["max-records-per-combinations"]
if "max-timeout-per-combinations" in configs: self.timeout = configs["max-timeout-per-combinations"]
self.timeout = configs["max-timeout-per-combinations"]
self.exclude_pivot = configs["exclude-entity-pivot"] self.exclude_pivot = configs["exclude-entity-pivot"]
self.pivot_params = pivot_params self.pivot_params = pivot_params
self.ctp_params = ctp_params self.ctp_params = ctp_params
def process(self, source_obj): def process(self, source_obs):
try:
# Inicializar la sesion de Spark @timeout(self.timeout)
session = self.createSession() def __process(source_obj):
# Traer la data desde BD tanto pivot como contraparte
# Traer la data desde BD tanto pivot como contraparte pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"]
pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"] dialect = source_obj.get_dialect()
jdbc_conn = source_obj.create_spark_connection()
jdbc_url = jdbc_conn["url"] pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"],
jdbc_properties = jdbc_conn["properties"] npartitions=4)
pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties) pivot_df = pivot_df.reset_index()
ctp_df = session.read.jdbc(url=jdbc_url, table=ctp_table, properties=jdbc_properties) ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"], npartitions=4)
ctp_df = ctp_df.reset_index()
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_' # Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
for column in pivot_df.columns: # pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
if column == EXCLUDE_ROWS_FIELD: # Iterar sobre las columnas del DataFrame
continue for column in pivot_df.columns:
pivot_df = pivot_df.withColumnRenamed(column, "PIVOT_"+column) if column == EXCLUDE_ROWS_FIELD:
continue
for column in ctp_df.columns: new_column_name = "PIVOT_" + column
if column == EXCLUDE_ROWS_FIELD: pivot_df = pivot_df.rename(columns={column: new_column_name})
continue
ctp_df = ctp_df.withColumnRenamed(column, "COUNTERPART_"+column) for column in ctp_df.columns:
if column == EXCLUDE_ROWS_FIELD:
for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()): continue
if isinstance(self.pivot_params[key_p], str): new_column_name = "COUNTERPART_" + column
self.pivot_params[key_p] = "PIVOT_"+self.pivot_params[key_p] ctp_df = ctp_df.rename(columns={column: new_column_name})
self.ctp_params[key_c] = "COUNTERPART_"+self.ctp_params[key_c]
else: for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()):
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]] if isinstance(self.pivot_params[key_p], str):
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]] self.pivot_params[key_p] = "PIVOT_"+self.pivot_params[key_p]
self.ctp_params[key_c] = "COUNTERPART_"+self.ctp_params[key_c]
from pyspark.sql.functions import sum, collect_list, round, when, col, lit else:
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
pivot_cols = self.pivot_params["columns-transaction"].copy() self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"]) from pyspark.sql.functions import sum, collect_list, round, when, col, lit
ctp_cols = self.ctp_params["columns-transaction"].copy() pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols: if self.pivot_params["amount-column"] in pivot_cols:
ctp_cols.remove(self.ctp_params["amount-column"]) pivot_cols.remove(self.pivot_params["amount-column"])
comb_per_group = self.comb_per_group ctp_cols = self.ctp_params["columns-transaction"].copy()
max_combinations = self.max_combinations if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
# Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0: max_combinations = self.max_combinations
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
# Ejecutamos lógica de excluir registros
# Caso: 1 - Muchos if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0: raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]), # Caso: 1 - Muchos
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"])) elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({
pivot_df2 = pivot_df self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades
self.ctp_params["id-column"]: list
# Caso: Muchos - 1 })
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0: ctp_df2 = ctp_df2.reset_index()
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).\
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]), pivot_df2 = pivot_df
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
# Caso: Muchos - 1
ctp_df2 = ctp_df elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
# Caso: Muchos - Muchos self.pivot_params["amount-column"]: 'sum',
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0: self.pivot_params["id-column"]: list
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]). \ })
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]), pivot_df2 = pivot_df2.reset_index()
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \ ctp_df2 = ctp_df
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"])) # Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(self.pivot_params["columns-transaction"], pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
self.ctp_params["columns-transaction"])] self.pivot_params["amount-column"]: 'sum',
total_merged = pivot_df2.join(ctp_df2, condition, 'left') self.pivot_params["id-column"]: list
})
total_merged = total_merged.withColumn("DIFF", when(col(self.ctp_params["columns-transaction"][0]).isNotNull(), pivot_df2 = pivot_df2.reset_index()
lit(0)).otherwise(lit(None)))
total_merged = total_merged.select(*pivot_df2.columns, "DIFF") ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({
self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades
condition = [total_merged[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)] self.ctp_params["id-column"]: list
merged = total_merged.join(ctp_df2, condition) })
ctp_df2 = ctp_df2.reset_index()
merged = merged.withColumn("DIFF", when(col("DIFF").isNull(),
total_merged[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]]).otherwise(col("DIFF"))) pivot_df2[self.pivot_params["amount-column"]] = pivot_df2[self.pivot_params["amount-column"]].round(
ROUND_DECIMAL)
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL)) ctp_df2[self.ctp_params["amount-column"]] = ctp_df2[self.ctp_params["amount-column"]].round(
if self.exclude_pivot: ROUND_DECIMAL)
df = pivot_df total_merged = pivot_df2.merge(ctp_df2, 'left', left_on=self.pivot_params["columns-transaction"],
group_cols = self.pivot_params["columns-group"] right_on=self.ctp_params["columns-transaction"])
amount_col = self.pivot_params["amount-column"] total_merged = total_merged.map_partitions(self.add_diff_column)
id_col = self.pivot_params["id-column"] selected_columns = list(pivot_df2.columns) + ['DIFF']
else: total_merged = total_merged[selected_columns]
df = ctp_df
group_cols = self.ctp_params["columns-group"] merged = total_merged.merge(ctp_df2, 'inner', left_on=pivot_cols, right_on=ctp_cols)
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"] merged['DIFF'] = merged['DIFF'].where(merged['DIFF'].notnull(),
merged[self.pivot_params["amount-column"]] - merged[
total_tmp_cols = group_cols + ["DIFF"] self.ctp_params["amount-column"]])
df3 = df.join(merged_df.select(*total_tmp_cols), group_cols)
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
df3 = df3.toPandas() merged = merged.drop_duplicates(subset=pivot_cols)
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"] elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
merged = merged.drop_duplicates(subset=ctp_cols)
# ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
df3 = df3.sort_values(group_cols + [amount_col]) merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL))
resultado = df3[total_cols].groupby(group_cols).apply(lambda x: custom_func(x, amount_col, id_col, max_combinations)) if self.exclude_pivot:
resultado = resultado.reset_index() df = pivot_df
if len(resultado.columns) == 1: group_cols = self.pivot_params["columns-group"]
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"]) amount_col = self.pivot_params["amount-column"]
else: id_col = self.pivot_params["id-column"]
resultado.columns = group_cols + ["LISTA_DIFF"] else:
# print(resultado["LISTA_DIFF"].apply(lambda x: x if pd.notna(x) and x[0]!=-1 else x)) df = ctp_df
meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols) group_cols = self.ctp_params["columns-group"]
print(meged2) amount_col = self.ctp_params["amount-column"]
meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array) id_col = self.ctp_params["id-column"]
meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))]
if meged2.empty: total_tmp_cols = group_cols + ["DIFF"]
pass
elif self.exclude_pivot: df3 = df.merge(merged_df[total_tmp_cols], 'inner', on=group_cols)
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1) df3 = df3.compute()
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"}) total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
if meged2['INTER_CTP_ID'].dtype == 'int64': resultado = df3.groupby(group_cols)[total_cols].apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
merged_df['INTER_CTP_ID'] = merged_df['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else: resultado = resultado.reset_index()
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1) if len(resultado.columns) == 1:
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"}) resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
if meged2['INTER_PIVOT_ID'].dtype == 'int64': else:
merged_df['INTER_PIVOT_ID'] = merged_df['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object') resultado.columns = group_cols + ["LISTA_DIFF"]
resultado = dd.from_pandas(resultado, npartitions=4)
self.output = meged2
meged2 = resultado.merge(merged_df, 'left', group_cols)
meged2 = meged2.map_partitions(lambda partition: partition.assign(
LISTA_DIFF=partition['LISTA_DIFF'].apply(lambda x: [] if pd.isna(x) else x)), meta=meged2.dtypes.to_dict())
meged2 = meged2[
(meged2['DIFF'] == 0) |
((meged2['DIFF'] != 0) & meged2['LISTA_DIFF'].apply(
lambda x: True if not pd.isna(x) and ((isinstance(x, List) and len(x) > 0) or (isinstance(x, str) and len(x) > 2)) else False))
]
meged2 = meged2.compute()
if meged2.empty:
pass
elif self.exclude_pivot:
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"})
if meged2['INTER_CTP_ID'].dtype == 'int64':
meged2['INTER_CTP_ID'] = meged2['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else:
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"})
if meged2['INTER_PIVOT_ID'].dtype == 'int64':
meged2['INTER_PIVOT_ID'] = meged2['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object')
return meged2
except TimeoutError as e:
raise TimeoutError(f"Tiempo límite superado. {e}")
self.output = __process(source_obs)
def response(self): def response(self):
return self.output return self.output
def add_diff_column(self, partition):
partition['DIFF'] = np.where(partition[self.ctp_params["columns-transaction"][0]].notnull(), 0, np.nan)
return partition
def handle_array(self, x): def handle_array(self, x):
# print(type(x))
if isinstance(x, np.ndarray): if isinstance(x, np.ndarray):
return x return x
else: else:
return [] return []
def array_except(self, arr1, arr2): def array_except(self, arr1, arr2):
# print(arr2)
if arr2 is None: if arr2 is None:
return arr1 return arr1
else: elif not isinstance(arr2, List):
return [item for item in arr1 if item not in arr2] cadena_sin_corchetes = arr2.strip('[]')
partes = cadena_sin_corchetes.split()
def createSession(self, name: str = "app_engine_spark"): # print(partes)
try: arr2 = [int(numero) for numero in partes]
from pyspark.sql import SparkSession arr1 = json.loads(arr1.replace(" ", ""))
session = SparkSession.builder.master(MASTER) \ return [item for item in arr1 if item not in arr2]
.appName(name) \
.config("spark.jars", MYSQL_JAR_PATH) \
.config("spark.executor.extraClassPath", MYSQL_JAR_PATH) \
.config("spark.driver.extraClassPath", MYSQL_JAR_PATH) \
.config("spark.driver.memory", DRIVER_MEMORY) \
.config("spark.executor.memory", EXECUTOR_MEMORY) \
.getOrCreate()
self.app.logger.info(f"Sesión creada exitosamente")
return session
except Exception as e:
raise Exception(f"Error creando sesion Spark. {e}")
def custom_func(group, amount_field, id_field, max_combinations): def custom_func(group, amount_field, id_field, max_combinations):
diff = group["DIFF"].values[0] diff_value = group["DIFF"].values[0]
if np.isnan(diff_value):
return None
diff = int(diff_value*(10**ROUND_DECIMAL))
if pd.isna(diff) or diff == 0: if pd.isna(diff) or diff == 0:
return None return None
group = group[group[EXCLUDE_ROWS_FIELD] == 'S'] group = group[group[EXCLUDE_ROWS_FIELD] == 'S']
group[amount_field] = group[amount_field].astype(float) group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True) group = group.reset_index(drop=True)
values = group[amount_field].values values = group[amount_field].values
values *= (10**ROUND_DECIMAL)
values = values.astype(np.int64)
ids = group[id_field].values ids = group[id_field].values
tam = len(values)
tam = tam if tam <= max_combinations else max_combinations
n = len(values) result = subset_sum_iter(values, diff, tam)
valores1 = encontrar_comb_1(values, diff) indices = ids[np.isin(values, result)]
if valores1[0] != -1: return indices
indices = ids[valores1]
return indices
valores2 = encontrar_comb_2(values, diff, n) @jit(nopython=False)
if valores2[0] != -1: def subset_sum_iter(numbers, target, num_elements):
indices = ids[valores2]
return indices # Initialize solutions list
# Iterar sobre todos los índices posibles final = typed.List.empty_list(types.int64)
# valores4 = encontrar_comb_4(values, diff, n) for step in range(1, num_elements+1):
# if valores4[0] != -1: # Build first index by taking the first num_elements from the numbers
# indices = ids[valores4] indices = list(range(step))
# return indices
valores5 = encontrar_comb_5(values, diff, n) while True:
if valores5[0] != -1: for i in range(step):
indices = ids[valores5] if indices[i] != i + len(numbers) - step:
return indices break
else:
# No combinations left
@njit break
def encontrar_comb_1(valores, target):
indice = [-1] # Increase current index and all its following ones
for idx, value in enumerate(valores): indices[i] += 1
suma = value for j in range(i + 1, step):
if round(suma, ROUND_DECIMAL) == target: indices[j] = indices[j - 1] + 1
indice = [idx for idx, val in enumerate(valores) if val in [value]]
return indice # Check current solution
solution = typed.List.empty_list(types.int64)
return indice for i in indices:
solution.append(numbers[i])
@njit if round(sum(solution), ROUND_DECIMAL) == target:
def encontrar_comb_2(valores, target, n): final = solution
indice = [-1] break
for i in range(n): if len(final) > 0:
array_except = np.delete(valores, i) break
for idx, value in enumerate(array_except):
suma = value + valores[i] return final
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i]]]
return indice
return indice
@njit
def encontrar_comb_4(valores, target, n):
indice = [-1]
for i in range(n):
a1 = np.delete(valores, i)
for j in range(len(a1)):
a2 = np.delete(a1, j)
for k in range(len(a2)):
array_except = np.delete(a2, k)
for idx, value in enumerate(array_except):
suma = value + valores[i] + a1[j] + a2[k]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i], a1[j], a2[k]]]
return indice
return indice
@njit
def encontrar_comb_5(valores, target, n):
indice = [-1]
for i in range(n):
a1 = np.delete(valores, i)
for j in range(len(a1)):
a2 = np.delete(a1, j)
for k in range(len(a2)):
a3 = np.delete(a2, k)
for l in range(len(a3)):
array_except = np.delete(a2, l)
for idx, value in enumerate(array_except):
suma = value + valores[i] + a1[j] + a2[k] + a3[l]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i], a1[j], a2[k], a3[l]]]
return indice
return indice
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment