Commit 3cd6f64d authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer' into 'developer_ev'

# Conflicts:
#   conf.yml
parents 484f2e4a 008be1de
...@@ -7,7 +7,7 @@ app: ...@@ -7,7 +7,7 @@ app:
port: 3301 port: 3301
user: root user: root
password: root password: root
db: cusca db: css_cuscatlan
dialect: 'mysql+pymysql' dialect: 'mysql+pymysql'
# BD conexion configurations # BD conexion configurations
# https://docs.sqlalchemy.org/en/14/core/pooling.html # https://docs.sqlalchemy.org/en/14/core/pooling.html
...@@ -35,10 +35,4 @@ app: ...@@ -35,10 +35,4 @@ app:
worker_connections: 50 worker_connections: 50
loglevel: 'debug' loglevel: 'debug'
accesslog: '-' accesslog: '-'
capture_output: True capture_output: True
\ No newline at end of file
spark:
cores: '*'
memory: 16 # En gb y que sea par, de preferencia 2, 4, 6, 18, 32, etc.
jars_path:
mysql: 'jars/mysql-connector-java-8.0.30.jar'
\ No newline at end of file
...@@ -25,11 +25,6 @@ class Config(object): ...@@ -25,11 +25,6 @@ class Config(object):
# Max threads allowed # Max threads allowed
max_engine_threads = conf["max_engine_threads"] max_engine_threads = conf["max_engine_threads"]
# Spark config
spark_cores = conf["spark"]["cores"]
spark_mem = conf["spark"]["memory"]
spark_jars = conf["spark"]["jars_path"]
class ProductionConfig(Config): class ProductionConfig(Config):
DEBUG = False DEBUG = False
......
from typing import Any, Dict from typing import Any, Dict
import importlib.util import importlib.util
from itertools import combinations
import time
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from app.main.engine.action.ActionInterface import ActionInterface from app.main.engine.action.ActionInterface import ActionInterface
...@@ -14,6 +18,11 @@ DRIVER_MEMORY = "4g" ...@@ -14,6 +18,11 @@ DRIVER_MEMORY = "4g"
EXECUTOR_MEMORY = "4g" EXECUTOR_MEMORY = "4g"
MYSQL_JAR_PATH = "jars/mysql-connector-java-8.0.30.jar" MYSQL_JAR_PATH = "jars/mysql-connector-java-8.0.30.jar"
# EXCLUDE VALIDATION FIELD
EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID"
# REDONDEO DE DECIMALES
ROUND_DECIMAL = 2
class MatchAndExcludeRecordsAction(ActionInterface): class MatchAndExcludeRecordsAction(ActionInterface):
...@@ -27,6 +36,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -27,6 +36,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.exclude_pivot = None self.exclude_pivot = None
self.pivot_params = None self.pivot_params = None
self.ctp_params = None self.ctp_params = None
self.output = None
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"] self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]): def parser(self, descriptor: Dict[str, Any]):
...@@ -69,6 +79,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -69,6 +79,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.ctp_params = ctp_params self.ctp_params = ctp_params
def process(self, source_obj): def process(self, source_obj):
# Inicializar la sesion de Spark # Inicializar la sesion de Spark
session = self.createSession() session = self.createSession()
...@@ -77,11 +88,111 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -77,11 +88,111 @@ class MatchAndExcludeRecordsAction(ActionInterface):
jdbc_conn = source_obj.create_spark_connection() jdbc_conn = source_obj.create_spark_connection()
jdbc_url = jdbc_conn["url"] jdbc_url = jdbc_conn["url"]
jdbc_properties = jdbc_conn["properties"] jdbc_properties = jdbc_conn["properties"]
print(jdbc_url, jdbc_properties)
pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties) pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties)
ctp_df = session.read.jdbc(url=jdbc_url, table=ctp_table, properties=jdbc_properties)
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
for column in pivot_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
pivot_df = pivot_df.withColumnRenamed(column, "PIVOT_"+column)
for column in ctp_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
ctp_df = ctp_df.withColumnRenamed(column, "COUNTERPART_"+column)
for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()):
if isinstance(self.pivot_params[key_p], str):
self.pivot_params[key_p] = "PIVOT_"+self.pivot_params[key_p]
self.ctp_params[key_c] = "COUNTERPART_"+self.ctp_params[key_c]
else:
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round, udf, array_except, size, col, array
from pyspark.sql.types import ArrayType, IntegerType, LongType
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"])
ctp_cols = self.ctp_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
timeout = self.timeout
max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
# Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
pivot_df2 = pivot_df.dropDuplicates(subset=pivot_cols)
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).\
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df.dropDuplicates(subset=ctp_cols)
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]). \
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = pivot_df2.join(ctp_df2, condition)
merged = merged.withColumn("DIFF", pivot_df2[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]])
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot:
df_pandas = pivot_df.toPandas()
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
df_pandas = ctp_df.toPandas()
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
custom_udf = udf(lambda diff: custom_apply(diff, df_pandas, group_cols, amount_col, id_col, timeout,
max_combinations), ArrayType(IntegerType()))
merged_df = merged_df.withColumn("LISTA_DIFF", custom_udf(merged_df["DIFF"]))
merged_df = merged_df.filter((col("DIFF") == 0) | ((col("DIFF") != 0) & (size(col("LISTA_DIFF")) > 0)))
if self.exclude_pivot:
merged_df = merged_df.withColumn("INTER_PIVOT_ID", array_except(merged_df[self.pivot_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df = merged_df.withColumnRenamed(self.ctp_params["id-column"], "INTER_CTP_ID")
if merged_df.schema["INTER_CTP_ID"].dataType == LongType():
merged_df = merged_df.withColumn("INTER_CTP_ID", array(col("INTER_CTP_ID")).cast(ArrayType(LongType())))
else:
merged_df = merged_df.withColumn("INTER_CTP_ID", array_except(merged_df[self.ctp_params["id-column"]],
merged_df["LISTA_DIFF"]))
merged_df = merged_df.withColumnRenamed(self.pivot_params["id-column"], "INTER_PIVOT_ID")
if merged_df.schema["INTER_PIVOT_ID"].dataType == LongType():
merged_df = merged_df.withColumn("INTER_PIVOT_ID", array(col("INTER_PIVOT_ID")).cast(ArrayType(LongType())))
self.output = merged_df
def response(self): def response(self):
print("Llegue al response3") self.output.show()
return self.output
def createSession(self, name: str = "app_engine_spark"): def createSession(self, name: str = "app_engine_spark"):
try: try:
...@@ -100,10 +211,66 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -100,10 +211,66 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise Exception(f"Error creando sesion Spark. {e}") raise Exception(f"Error creando sesion Spark. {e}")
def custom_apply(raw, df_pandas, group_cols, amount_col, id_col, timeout, max_combinations):
if raw == 0:
return []
else:
total_cols = group_cols.copy()
total_cols.append(amount_col)
total_cols.append(id_col)
total_cols.append(EXCLUDE_ROWS_FIELD)
result = df_pandas[total_cols].groupby(group_cols).apply(custom_func, diff=raw, amount_field=amount_col, id_field=id_col,
timeout=timeout, max_combinations=max_combinations).values
resp = ",".join(map(str, result)) if len(result) > 0 else ""
return [int(elem) for elem in resp[1:-1].split(",")] if resp != "" else []
def custom_func(group, diff, amount_field, id_field, timeout, max_combinations):
group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True)
values = group[amount_field].values
indexs = group.index.values
tam = len(values)
rang = range(1, tam + 1) if tam <= max_combinations else range(1, max_combinations + 1)
final = None
stop_event = False
def buscar_combinacion(i):
nonlocal final, stop_event
if not stop_event: # Solo continuar si no se ha encontrado una combinación aún
for comb in combinations(indexs, i):
if stop_event: # Verificar si se ha establecido la señal de detención
break
if np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff:
# Validamos si la combinación son de registros validados 'EXCLUDE_VALID'
final = group[group.index.isin(comb)]
if (final[EXCLUDE_ROWS_FIELD] == 'S').all():
final = final[id_field].tolist()
stop_event = True # Establecer la señal de detención
break
return None
start_time = time.time()
with ThreadPoolExecutor() as executor:
futures = [executor.submit(buscar_combinacion, i) for i in rang]
for future in futures:
try:
future.result(timeout=timeout)
except TimeoutError:
stop_event = True # Establecer la señal de detención si se alcanza el tiempo límite
break
if stop_event or final is not None:
break
if time.time() - start_time >= timeout:
return None # Devolver None si se alcanzó el tiempo límite
return final
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment