Commit e82162be authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer_ca' into 'developer'

Actualizacion de script exclude-and-match con dask, pandas y libreria dpss

See merge request !5
parents 287ace11 8ac5263c
FROM python:3.10-slim-buster
ENV FLASK_APP=run.py
COPY run.py flask_app.py gunicorn-cfg.py requirements.txt config.py conf.yml /
COPY app app
COPY scripts scripts
RUN pip install --upgrade pip
RUN pip cache purge
RUN pip install -r requirements.txt
CMD ["gunicorn", "--config", "gunicorn-cfg.py", "run:app"]
......@@ -44,6 +44,15 @@ class Database:
except Exception as e:
self.app.logger.error(f"Error cerrando básica conexión. {e}")
def get_dialect(self) -> str:
dialect = ""
try:
dialect = self.factory.get_dialect()
except Exception as e:
self.app.logger.error(f"Error obteniendo dialect. {e}")
finally:
return dialect
def create_engine(self) -> None:
try:
if isinstance(self.engine, type(None)):
......
......@@ -23,11 +23,12 @@ class Mysql:
self.params = params
self.engine = None
self.connection = None
self.dialect = None
def create_spark_connection(self):
params = {}
try:
url = "jdbc:mysql://"+self.host+":"+str(self.port)+"/"+self.database
url = "jdbc:mysql://"+self.user+":"+self.password+"@"+self.host+":"+str(self.port)+"/"+self.database
properties = {"user": self.user, "password": self.password, "driver": "com.mysql.cj.jdbc.Driver"}
params["url"] = url
params["properties"] = properties
......@@ -46,10 +47,17 @@ class Mysql:
finally:
return self.connection
def create_engine(self) -> None:
def get_dialect(self) -> str:
try:
dialect = DatabaseDialectEnum.MYSQL.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4"
self.dialect = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4"
except Exception as e:
self.app.logger.error(f"Error obteniendo dialect de Mysql. {e}")
return self.dialect
def create_engine(self) -> None:
try:
url = self.get_dialect()
self.engine = create_engine(url, pool_recycle=3600, pool_pre_ping=True, **self.params)
except Exception as e:
self.app.logger.error(f"Error creando engine de Mysql. {e}")
......
......@@ -11,3 +11,4 @@ class CodeResponseEnum(Enum):
OUTPUT_ERROR = 606
EMPTY_DATASET = 607
ERROR = 609
TIMEOUT = 610
......@@ -4,3 +4,4 @@ from enum import Enum
class StatusEnum(Enum):
OK = 200
ERROR = 609
TIMEOUT = 610
from typing import Dict, Any
import time
import traceback as traceback_lib
import importlib
......@@ -46,17 +45,23 @@ class Process:
obj_script.parser(self.descriptor)
# Iniciando process
self.app.logger.info(f"Iniciando procesamiento de script")
self.app.logger.info(f"Iniciando procesamiento de script - {self.descriptor['idProcess']}")
obj_script.process(source)
# Guardando resultado
self.app.logger.info(f"Generado y guardando resultado")
self.app.logger.info(f"Generado y guardando resultado - {self.descriptor['idProcess']}")
response = obj_script.response()
# response.show()
result = self.utils.create_result(response, self.descriptor)
del response
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"])
del result
except TimeoutError as e:
self.app.logger.error(f"Error de Timeout. Error: {e}")
status, status_description = CodeResponseEnum.TIMEOUT, str(e)
except IndexError as e:
self.app.logger.error(f"Error extrayendo insumos. Vacío. Error: {e}")
status, status_description = CodeResponseEnum.EMPTY_DATASET, str(e)
......@@ -77,7 +82,7 @@ class Process:
status, status_description = CodeResponseEnum.PARAMETERS_ERROR, str(e)
except Exception as e:
traceback_lib.print_exc()
self.app.logger.error(f"Error procesando engine. {e}")
self.app.logger.error(f"Error procesando engine - {self.descriptor['idProcess']}. {e}")
status, status_description = StatusEnum.ERROR, str(e)
finally:
return self.utils.create_response(status, status_description)
......@@ -5,6 +5,8 @@ import shutil
from enum import Enum
# from pyspark.sql import SparkSession
import json
from app.main.engine.enum.CodeResponseEnum import CodeResponseEnum
from app.main.engine.util.Timezone import Timezone
# from config import Config as cfg
......@@ -52,8 +54,11 @@ class Utils:
if codeEnum.value == StatusEnum.OK.value:
response.update({'status': StatusEnum.OK.name, 'detail': detail})
else:
error = StatusEnum.ERROR.name
if codeEnum.value == CodeResponseEnum.TIMEOUT.value:
error = StatusEnum.TIMEOUT.name
description = DescResponseEnum[codeEnum.name].value
response.update({'status': StatusEnum.ERROR.name, 'message': description,
response.update({'status': error, 'message': description,
'detail': detail})
return response
......@@ -65,6 +70,14 @@ class Utils:
pivot_params = descriptor["params-input"]["pivot-config"]
ctp_params = descriptor["params-input"]["counterpart-config"]
for key_p, key_c in zip(pivot_params.keys(), ctp_params.keys()):
if isinstance(pivot_params[key_p], str):
pivot_params[key_p] = "PIVOT_" + pivot_params[key_p]
ctp_params[key_c] = "COUNTERPART_" + ctp_params[key_c]
else:
pivot_params[key_p] = ["PIVOT_" + column for column in pivot_params[key_p]]
ctp_params[key_c] = ["COUNTERPART_" + column for column in ctp_params[key_c]]
group_pivot_match = pivot_params["columns-group"]
transaction_pivot_match = pivot_params["columns-transaction"]
......@@ -73,7 +86,7 @@ class Utils:
used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match
if data.empty:
if data is None or data.empty:
self.app.logger.info(f"El dataframe resultado esta vacio")
else:
for idx, i in data.iterrows():
......
......@@ -23,16 +23,16 @@ app:
timezone: 'GMT-5'
time_pattern: '%Y-%m-%d %H:%M:%S'
logging: 'INFO'
max_engine_threads: 2 # threads (maximum)
max_engine_threads: 50 # threads (maximum)
# Make the service in a production state
# Manage connections to the REST Service published. Allow workers to receive the connections.
# https://docs.gunicorn.org/en/stable/
gunicorn:
bind: '0.0.0.0:7500'
bind: '0.0.0.0:8000'
worker_class: 'gthread'
threads: 8
worker_connections: 50
threads: 51
worker_connections: 100
loglevel: 'debug'
accesslog: '-'
capture_output: True
\ No newline at end of file
FROM public.ecr.aws/emr-serverless/spark/emr-7.0.0:latest
USER root
# install python 3
RUN yum install -y gcc openssl-devel bzip2-devel libffi-devel tar gzip wget make zlib-devel
RUN wget https://www.python.org/ftp/python/3.10.0/Python-3.10.0.tgz && \
tar xzf Python-3.10.0.tgz && cd Python-3.10.0 && \
./configure --enable-optimizations && \
make altinstall
RUN python3 -m pip install numpy pandas py4j python-dateutil pytz six tzdata
# EMRS will run the image as hadoop
USER hadoop:hadoop
\ No newline at end of file
version: "3"
services:
css-cusca-scripts:
image: css_cuscatlan:0.0.2
container_name: css-cusca
deploy:
resources:
limits:
cpus: '8'
memory: 24G
reservations:
cpus: '4'
memory: 8G
restart: always
networks: [ css-cusca-network ]
ports:
- "9500:8000"
volumes:
- "./conf.yml:/conf.yml"
- "./scripts/match-and-exclude-records-actions_v1.py:/scripts/match-and-exclude-records-actions_v1.py"
networks:
css-cusca-network:
driver: bridge
"""
Copyright (c) 2019 - present AppSeed.us
"""
import yaml
conf = yaml.safe_load(open('conf.yml'))
conf = conf["app"]["gunicorn"]
bind = conf["bind"]
worker_class = conf["worker_class"]
threads = conf["threads"]
worker_connections = conf["worker_connections"]
loglevel = conf["loglevel"]
accesslog = conf["accesslog"]
capture_output = conf["capture_output"]
arrow==1.3.0
blinker==1.8.2
cli-exit-tools==1.2.6
click==8.1.7
cloudpickle==3.0.0
dask==2024.1.1
dill==0.3.8
dpss==0.22.0
Flask==3.0.3
fsspec==2024.3.1
greenlet==3.0.3
gunicorn==22.0.0
importlib_metadata==7.1.0
itsdangerous==2.2.0
Jinja2==3.1.4
lib-detect-testenv==2.0.8
locket==1.0.0
MarkupSafe==2.1.5
multiprocess==0.70.16
numpy==1.26.4
packaging==24.0
pandas==2.2.2
partd==1.4.2
pillow==10.3.0
psutil==5.9.8
pyarrow==14.0.2
PyMySQL==1.1.0
python-dateutil==2.9.0.post0
python-decouple==3.8
pytz==2024.1
PyYAML==6.0.1
six==1.16.0
SQLAlchemy==2.0.30
toolz==0.12.1
types-python-dateutil==2.9.0.20240316
typing_extensions==4.11.0
tzdata==2024.1
Werkzeug==3.0.3
wrapt==1.16.0
wrapt_timeout_decorator==1.5.1
zipp==3.18.1
......@@ -6,4 +6,4 @@ base = MainApplication()
app = base.create_app()
if __name__ == "__main__":
base.run(port=8000)
base.run(port=7500)
from typing import Any, Dict, List
import importlib.util
from itertools import combinations
import multiprocessing as mp
import numpy as np
import pandas as pd
from parallel_pandas import ParallelPandas
from concurrent.futures import ThreadPoolExecutor
import json
import time
from dpss import find_subset
from dask import dataframe as dd
from wrapt_timeout_decorator import timeout
import multiprocessing as mp
from app.main.engine.action.ActionInterface import ActionInterface
......@@ -14,12 +15,6 @@ relation_classname_identifier = {
"match-and-exclude-records-actions": "MatchAndExcludeRecordsAction"
}
# CONFIGURACION DE SESION DE SPARK
MASTER = "local[*]"
DRIVER_MEMORY = "8g"
EXECUTOR_MEMORY = "8g"
MYSQL_JAR_PATH = "jars/mysql-connector-java-8.0.30.jar"
# EXCLUDE VALIDATION FIELD
EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID"
# REDONDEO DE DECIMALES
......@@ -28,28 +23,18 @@ ROUND_DECIMAL = 2
class MatchAndExcludeRecordsAction(ActionInterface):
library_required = "pyspark"
version_required = "3.4.0"
def __init__(self, app) -> None:
super().__init__(app)
self.identifier = None
self.max_combinations = None
self.comb_per_group = None
self.timeout = None
self.exclude_pivot = None
self.pivot_params = None
self.ctp_params = None
self.output = None
self.config_params = ["max-records-per-combinations", "max-combinations-per-group", "exclude-entity-pivot"]
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]):
# Validar si pyspark y su versión está instalada
pyspark_lib = importlib.util.find_spec(self.library_required)
if pyspark_lib is None:
raise ImportError(f"Librería requerida *{self.library_required}* no instalada")
import pyspark
version = pyspark.__version__
if version != self.version_required:
raise ImportError(f"Versión requerida no instalada. Requerida: {self.version_required}. Instalada: {version}")
# Validación de parámetros de entrada
entity_config_params = ["tablename", "id-column", "amount-column", "columns-group", "columns-transaction"]
......@@ -74,150 +59,211 @@ class MatchAndExcludeRecordsAction(ActionInterface):
if param not in pivot_params.keys() or param not in ctp_params.keys():
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.identifier = descriptor["idProcess"]
self.max_combinations = configs["max-records-per-combinations"]
self.comb_per_group = configs["max-combinations-per-group"]
self.timeout = configs["max-timeout-per-combinations"]
self.exclude_pivot = configs["exclude-entity-pivot"]
self.pivot_params = pivot_params
self.ctp_params = ctp_params
def process(self, source_obj):
# Inicializar la sesion de Spark
session = self.createSession()
# Traer la data desde BD tanto pivot como contraparte
pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"]
jdbc_conn = source_obj.create_spark_connection()
jdbc_url = jdbc_conn["url"]
jdbc_properties = jdbc_conn["properties"]
pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties)
ctp_df = session.read.jdbc(url=jdbc_url, table=ctp_table, properties=jdbc_properties)
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
for column in pivot_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
pivot_df = pivot_df.withColumnRenamed(column, "PIVOT_"+column)
for column in ctp_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
ctp_df = ctp_df.withColumnRenamed(column, "COUNTERPART_"+column)
for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()):
if isinstance(self.pivot_params[key_p], str):
self.pivot_params[key_p] = "PIVOT_"+self.pivot_params[key_p]
self.ctp_params[key_c] = "COUNTERPART_"+self.ctp_params[key_c]
else:
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round, when, col, lit
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"])
ctp_cols = self.ctp_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
comb_per_group = self.comb_per_group
max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
# Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
pivot_df2 = pivot_df
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).\
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]). \
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(self.pivot_params["columns-transaction"],
self.ctp_params["columns-transaction"])]
total_merged = pivot_df2.join(ctp_df2, condition, 'left')
total_merged = total_merged.withColumn("DIFF", when(col(self.ctp_params["columns-transaction"][0]).isNotNull(),
lit(0)).otherwise(lit(None)))
total_merged = total_merged.select(*pivot_df2.columns, "DIFF")
condition = [total_merged[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = total_merged.join(ctp_df2, condition)
merged = merged.withColumn("DIFF", when(col("DIFF").isNull(),
total_merged[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]]).otherwise(col("DIFF")))
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot:
df = pivot_df
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
df = ctp_df
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
total_tmp_cols = group_cols + ["DIFF"]
df3 = df.join(merged_df.select(*total_tmp_cols), group_cols)
df3 = df3.toPandas()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
resultado = df3[total_cols].groupby(group_cols).p_apply(lambda x: custom_func(x, amount_col, id_col, comb_per_group, max_combinations))
resultado = resultado.reset_index()
if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else:
resultado.columns = group_cols + ["LISTA_DIFF"]
meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols)
meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array)
meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))]
if meged2.empty:
pass
elif self.exclude_pivot:
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"})
if meged2['INTER_CTP_ID'].dtype == 'int64':
merged_df['INTER_CTP_ID'] = merged_df['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else:
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"})
if meged2['INTER_PIVOT_ID'].dtype == 'int64':
merged_df['INTER_PIVOT_ID'] = merged_df['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object')
self.output = meged2
def process(self, source_obs):
try:
@timeout(self.timeout)
def __process(source_obj):
# Traer la data desde BD tanto pivot como contraparte
pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"]
dialect = source_obj.get_dialect()
# Seleccionamos las columnas a traer de la BD
total_pivot_cols = self.pivot_params["columns-transaction"] + self.pivot_params["columns-group"] + \
[self.pivot_params["amount-column"], self.pivot_params["id-column"], EXCLUDE_ROWS_FIELD]
total_pivot_cols = list(set(total_pivot_cols))
total_ctp_cols = self.ctp_params["columns-transaction"] + self.ctp_params["columns-group"] + \
[self.ctp_params["amount-column"], self.ctp_params["id-column"], EXCLUDE_ROWS_FIELD]
total_ctp_cols = list(set(total_ctp_cols))
pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"],
npartitions=mp.cpu_count(), columns=total_pivot_cols)
pivot_df = pivot_df.reset_index()
ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"],
npartitions=mp.cpu_count(), columns=total_ctp_cols)
ctp_df = ctp_df.reset_index()
self.app.logger.debug(f"Insumos cargados - {self.identifier}")
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
# Iterar sobre las columnas del DataFrame
for column in pivot_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
new_column_name = "PIVOT_" + column
pivot_df = pivot_df.rename(columns={column: new_column_name})
for column in ctp_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
new_column_name = "COUNTERPART_" + column
ctp_df = ctp_df.rename(columns={column: new_column_name})
for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()):
if isinstance(self.pivot_params[key_p], str):
self.pivot_params[key_p] = "PIVOT_"+self.pivot_params[key_p]
self.ctp_params[key_c] = "COUNTERPART_"+self.ctp_params[key_c]
else:
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"])
ctp_cols = self.ctp_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
# Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
self.app.logger.info(f"Ejecutando Caso 1 - Muchos - {self.identifier}")
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({
self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades
self.ctp_params["id-column"]: list
})
ctp_df2 = ctp_df2.reset_index()
pivot_df2 = pivot_df
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
self.app.logger.info(f"Ejecutando Caso Muchos - 1 - {self.identifier}")
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
self.pivot_params["amount-column"]: 'sum',
self.pivot_params["id-column"]: list
})
pivot_df2 = pivot_df2.reset_index()
ctp_df2 = ctp_df
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
self.app.logger.info(f"Ejecutando Caso Muchos - Muchos - {self.identifier}")
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
self.pivot_params["amount-column"]: 'sum',
self.pivot_params["id-column"]: list
})
pivot_df2 = pivot_df2.reset_index()
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({
self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades
self.ctp_params["id-column"]: list
})
ctp_df2 = ctp_df2.reset_index()
pivot_df2[self.pivot_params["amount-column"]] = pivot_df2[self.pivot_params["amount-column"]].round(
ROUND_DECIMAL)
ctp_df2[self.ctp_params["amount-column"]] = ctp_df2[self.ctp_params["amount-column"]].round(
ROUND_DECIMAL)
total_merged = pivot_df2.merge(ctp_df2, 'left', left_on=self.pivot_params["columns-transaction"],
right_on=self.ctp_params["columns-transaction"])
total_merged = total_merged.map_partitions(self.add_diff_column)
selected_columns = list(pivot_df2.columns) + ['DIFF']
total_merged = total_merged[selected_columns]
merged = total_merged.merge(ctp_df2, 'inner', left_on=pivot_cols, right_on=ctp_cols)
merged['DIFF'] = merged['DIFF'].where(merged['DIFF'].notnull(),
merged[self.pivot_params["amount-column"]] - merged[
self.ctp_params["amount-column"]])
merged = merged.dropna(subset=["DIFF"])
if merged.known_divisions:
return pd.DataFrame([])
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
merged = merged.set_index(self.pivot_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.pivot_params["id-column"]]))
merged = merged.drop_duplicates(subset=pivot_cols)
merged = merged.reset_index()
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
merged = merged.set_index(self.ctp_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.ctp_params["id-column"]]))
merged = merged.drop_duplicates(subset=ctp_cols)
merged = merged.reset_index()
merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL))
if self.exclude_pivot:
self.app.logger.info(f"Se excluiran registros del pivot - {self.identifier}")
df = pivot_df
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
self.app.logger.info(f"Se excluiran registros de la contraparte - {self.identifier}")
df = ctp_df
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
total_tmp_cols = group_cols + ["DIFF"]
df3 = df.merge(merged_df[total_tmp_cols], 'inner', on=group_cols)
# Filtro de exclusión de registros con validación
df3 = df3[df3[EXCLUDE_ROWS_FIELD] == 'S']
df3 = df3.compute()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
self.app.logger.info(f"Aplicando algoritmo de exclusión. Máximo de {str(max_combinations)} combinaciones - {self.identifier}")
resultado = df3.groupby(group_cols)[total_cols].apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
resultado = resultado.reset_index()
if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else:
resultado.columns = group_cols + ["LISTA_DIFF"]
resultado = dd.from_pandas(resultado, npartitions=mp.cpu_count())
meged2 = resultado.merge(merged_df, 'left', group_cols)
meged2 = meged2.map_partitions(lambda partition: partition.assign(
LISTA_DIFF=partition['LISTA_DIFF'].apply(lambda x: [] if pd.isna(x) else x)), meta=meged2.dtypes.to_dict())
meged2 = meged2[(meged2['DIFF'] != 0) & meged2['LISTA_DIFF'].apply(
lambda x: True if not pd.isna(x) and ((isinstance(x, List) and len(x) > 0) or (isinstance(x, str) and len(x) > 2)) else False)
]
meged2 = meged2.compute()
if meged2.empty:
pass
elif self.exclude_pivot:
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"})
if meged2['INTER_CTP_ID'].dtype == 'int64':
meged2['INTER_CTP_ID'] = meged2['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else:
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"})
if meged2['INTER_PIVOT_ID'].dtype == 'int64':
meged2['INTER_PIVOT_ID'] = meged2['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object')
return meged2
except TimeoutError as e:
raise TimeoutError(f"Tiempo límite superado. {e}")
start_time = time.time()
self.output = __process(source_obs)
self.app.logger.debug(f"{self.output} - {self.identifier}")
self.app.logger.info(f"Ejecución de script terminado en {time.time() - start_time} segundos - {self.identifier}")
def response(self):
return self.output
def add_diff_column(self, partition):
partition['DIFF'] = np.where(partition[self.ctp_params["columns-transaction"][0]].notnull(), 0, np.nan)
return partition
def handle_array(self, x):
if isinstance(x, List):
if isinstance(x, np.ndarray):
return x
else:
return []
......@@ -225,78 +271,41 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def array_except(self, arr1, arr2):
if arr2 is None:
return arr1
else:
return [item for item in arr1 if item not in arr2]
elif not isinstance(arr2, List):
cadena_sin_corchetes = arr2.replace(" ", "").strip('[]')
partes = cadena_sin_corchetes.split(",")
arr2 = [int(numero) for numero in partes]
arr1 = json.loads(arr1.replace(" ", ""))
return [item for item in arr1 if item not in arr2]
def createSession(self, name: str = "app_engine_spark"):
try:
from pyspark.sql import SparkSession
session = SparkSession.builder.master(MASTER) \
.appName(name) \
.config("spark.jars", MYSQL_JAR_PATH) \
.config("spark.executor.extraClassPath", MYSQL_JAR_PATH) \
.config("spark.driver.extraClassPath", MYSQL_JAR_PATH) \
.config("spark.driver.memory", DRIVER_MEMORY) \
.config("spark.executor.memory", EXECUTOR_MEMORY) \
.getOrCreate()
self.app.logger.info(f"Sesión creada exitosamente")
return session
except Exception as e:
raise Exception(f"Error creando sesion Spark. {e}")
def custom_func(group, amount_field, id_field, max_comb_per_group, max_combinations):
diff = group["DIFF"].values[0]
def custom_func(group, amount_field, id_field, max_combinations):
diff_value = group["DIFF"].values[0]
if np.isnan(diff_value):
return None
diff = int(diff_value*(10**ROUND_DECIMAL))
if pd.isna(diff) or diff == 0:
return None
group = group[group[EXCLUDE_ROWS_FIELD] == 'S']
group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True)
values = group[amount_field].values
indexs = group.index.values
np.random.shuffle(indexs)
values *= (10**ROUND_DECIMAL)
values = values.astype(np.int64)
ids = group[id_field].values
tam = len(values)
rang = range(1, tam + 1) if tam <= max_combinations else range(1, max_combinations + 1)
final = None
stop_event = False
def buscar_combinacion(i):
nonlocal final, stop_event
if not stop_event:
for index, comb in enumerate(combinations(indexs, i)):
if stop_event or index > max_comb_per_group:
break
elif np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff:
final = group.loc[list(comb), id_field].tolist()
stop_event = True
break
tam = tam if tam <= max_combinations else max_combinations
result = find_subset(values, diff, tam)
indices = []
if len(result) > 0:
result = result[0]
for idx, val in zip(ids, values):
if val in result:
indices.append(idx)
result.remove(val)
else:
return None
with ThreadPoolExecutor() as executor:
futures = [executor.submit(buscar_combinacion, i) for i in rang]
for future in futures:
try:
future.result()
except TimeoutError:
stop_event = True
break
if stop_event or final is not None:
break
return final
return indices
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment