Commit c1597525 authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1-dask

parent b0fd6670
...@@ -44,6 +44,15 @@ class Database: ...@@ -44,6 +44,15 @@ class Database:
except Exception as e: except Exception as e:
self.app.logger.error(f"Error cerrando básica conexión. {e}") self.app.logger.error(f"Error cerrando básica conexión. {e}")
def get_dialect(self) -> str:
dialect = ""
try:
dialect = self.factory.get_dialect()
except Exception as e:
self.app.logger.error(f"Error obteniendo dialect. {e}")
finally:
return dialect
def create_engine(self) -> None: def create_engine(self) -> None:
try: try:
if isinstance(self.engine, type(None)): if isinstance(self.engine, type(None)):
......
...@@ -23,6 +23,7 @@ class Mysql: ...@@ -23,6 +23,7 @@ class Mysql:
self.params = params self.params = params
self.engine = None self.engine = None
self.connection = None self.connection = None
self.dialect = None
def create_spark_connection(self): def create_spark_connection(self):
params = {} params = {}
...@@ -46,10 +47,17 @@ class Mysql: ...@@ -46,10 +47,17 @@ class Mysql:
finally: finally:
return self.connection return self.connection
def create_engine(self) -> None: def get_dialect(self) -> str:
try: try:
dialect = DatabaseDialectEnum.MYSQL.value dialect = DatabaseDialectEnum.MYSQL.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4" self.dialect = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4"
except Exception as e:
self.app.logger.error(f"Error obteniendo dialect de Mysql. {e}")
return self.dialect
def create_engine(self) -> None:
try:
url = self.get_dialect()
self.engine = create_engine(url, pool_recycle=3600, pool_pre_ping=True, **self.params) self.engine = create_engine(url, pool_recycle=3600, pool_pre_ping=True, **self.params)
except Exception as e: except Exception as e:
self.app.logger.error(f"Error creando engine de Mysql. {e}") self.app.logger.error(f"Error creando engine de Mysql. {e}")
......
...@@ -23,16 +23,16 @@ app: ...@@ -23,16 +23,16 @@ app:
timezone: 'GMT-5' timezone: 'GMT-5'
time_pattern: '%Y-%m-%d %H:%M:%S' time_pattern: '%Y-%m-%d %H:%M:%S'
logging: 'INFO' logging: 'INFO'
max_engine_threads: 2 # threads (maximum) max_engine_threads: 50 # threads (maximum)
# Make the service in a production state # Make the service in a production state
# Manage connections to the REST Service published. Allow workers to receive the connections. # Manage connections to the REST Service published. Allow workers to receive the connections.
# https://docs.gunicorn.org/en/stable/ # https://docs.gunicorn.org/en/stable/
gunicorn: gunicorn:
bind: '0.0.0.0:7500' bind: '0.0.0.0:8000'
worker_class: 'gthread' worker_class: 'gthread'
threads: 8 threads: 51
worker_connections: 50 worker_connections: 51
loglevel: 'debug' loglevel: 'debug'
accesslog: '-' accesslog: '-'
capture_output: True capture_output: True
\ No newline at end of file
from typing import Any, Dict from typing import Any, Dict, List
import importlib.util import importlib.util
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import multiprocessing as mp import json
from parallel_pandas import ParallelPandas from dask import dataframe as dd
from numba import jit, types, typed
from wrapt_timeout_decorator import timeout from wrapt_timeout_decorator import timeout
from app.main.engine.action.ActionInterface import ActionInterface from app.main.engine.action.ActionInterface import ActionInterface
...@@ -13,12 +14,6 @@ relation_classname_identifier = { ...@@ -13,12 +14,6 @@ relation_classname_identifier = {
"match-and-exclude-records-actions": "MatchAndExcludeRecordsAction" "match-and-exclude-records-actions": "MatchAndExcludeRecordsAction"
} }
# CONFIGURACION DE SESION DE SPARK
MASTER = "local[*]"
DRIVER_MEMORY = "8g"
EXECUTOR_MEMORY = "8g"
MYSQL_JAR_PATH = "jars/mysql-connector-java-8.0.30.jar"
# EXCLUDE VALIDATION FIELD # EXCLUDE VALIDATION FIELD
EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID" EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID"
# REDONDEO DE DECIMALES # REDONDEO DE DECIMALES
...@@ -83,28 +78,30 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -83,28 +78,30 @@ class MatchAndExcludeRecordsAction(ActionInterface):
try: try:
@timeout(self.timeout) @timeout(self.timeout)
def __process(source_obj): def __process(source_obj):
# Inicializar la sesion de Spark
session = self.createSession()
# Traer la data desde BD tanto pivot como contraparte # Traer la data desde BD tanto pivot como contraparte
pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"] pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"]
jdbc_conn = source_obj.create_spark_connection() dialect = source_obj.get_dialect()
jdbc_url = jdbc_conn["url"]
jdbc_properties = jdbc_conn["properties"] pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"],
pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties) npartitions=4)
ctp_df = session.read.jdbc(url=jdbc_url, table=ctp_table, properties=jdbc_properties) pivot_df = pivot_df.reset_index()
ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"], npartitions=4)
ctp_df = ctp_df.reset_index()
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input # Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_' # pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
# Iterar sobre las columnas del DataFrame
for column in pivot_df.columns: for column in pivot_df.columns:
if column == EXCLUDE_ROWS_FIELD: if column == EXCLUDE_ROWS_FIELD:
continue continue
pivot_df = pivot_df.withColumnRenamed(column, "PIVOT_"+column) new_column_name = "PIVOT_" + column
pivot_df = pivot_df.rename(columns={column: new_column_name})
for column in ctp_df.columns: for column in ctp_df.columns:
if column == EXCLUDE_ROWS_FIELD: if column == EXCLUDE_ROWS_FIELD:
continue continue
ctp_df = ctp_df.withColumnRenamed(column, "COUNTERPART_"+column) new_column_name = "COUNTERPART_" + column
ctp_df = ctp_df.rename(columns={column: new_column_name})
for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()): for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()):
if isinstance(self.pivot_params[key_p], str): if isinstance(self.pivot_params[key_p], str):
...@@ -132,44 +129,61 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -132,44 +129,61 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Caso: 1 - Muchos # Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0: elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \ ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]), self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"])) self.ctp_params["id-column"]: list
})
ctp_df2 = ctp_df2.reset_index()
pivot_df2 = pivot_df pivot_df2 = pivot_df
# Caso: Muchos - 1 # Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0: elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).\ pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]), self.pivot_params["amount-column"]: 'sum',
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"])) self.pivot_params["id-column"]: list
})
pivot_df2 = pivot_df2.reset_index()
ctp_df2 = ctp_df ctp_df2 = ctp_df
# Caso: Muchos - Muchos # Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0: elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]). \ pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
agg(round(sum(self.pivot_params["amount-column"]), ROUND_DECIMAL).alias(self.pivot_params["amount-column"]), self.pivot_params["amount-column"]: 'sum',
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"])) self.pivot_params["id-column"]: list
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]). \ })
agg(round(sum(self.ctp_params["amount-column"]), ROUND_DECIMAL).alias(self.ctp_params["amount-column"]), pivot_df2 = pivot_df2.reset_index()
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(self.pivot_params["columns-transaction"], self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades
self.ctp_params["columns-transaction"])] self.ctp_params["id-column"]: list
total_merged = pivot_df2.join(ctp_df2, condition, 'left') })
ctp_df2 = ctp_df2.reset_index()
total_merged = total_merged.withColumn("DIFF", when(col(self.ctp_params["columns-transaction"][0]).isNotNull(),
lit(0)).otherwise(lit(None))) pivot_df2[self.pivot_params["amount-column"]] = pivot_df2[self.pivot_params["amount-column"]].round(
total_merged = total_merged.select(*pivot_df2.columns, "DIFF") ROUND_DECIMAL)
ctp_df2[self.ctp_params["amount-column"]] = ctp_df2[self.ctp_params["amount-column"]].round(
condition = [total_merged[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)] ROUND_DECIMAL)
merged = total_merged.join(ctp_df2, condition) total_merged = pivot_df2.merge(ctp_df2, 'left', left_on=self.pivot_params["columns-transaction"],
right_on=self.ctp_params["columns-transaction"])
total_merged = total_merged.map_partitions(self.add_diff_column)
selected_columns = list(pivot_df2.columns) + ['DIFF']
total_merged = total_merged[selected_columns]
merged = total_merged.merge(ctp_df2, 'inner', left_on=pivot_cols, right_on=ctp_cols)
merged['DIFF'] = merged['DIFF'].where(merged['DIFF'].notnull(),
merged[self.pivot_params["amount-column"]] - merged[
self.ctp_params["amount-column"]])
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
merged = merged.drop_duplicates(subset=pivot_cols)
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
merged = merged.drop_duplicates(subset=ctp_cols)
merged = merged.withColumn("DIFF", when(col("DIFF").isNull(), merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL))
total_merged[self.pivot_params["amount-column"]] - ctp_df2[self.ctp_params["amount-column"]]).otherwise(col("DIFF")))
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if self.exclude_pivot: if self.exclude_pivot:
df = pivot_df df = pivot_df
group_cols = self.pivot_params["columns-group"] group_cols = self.pivot_params["columns-group"]
...@@ -182,24 +196,30 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -182,24 +196,30 @@ class MatchAndExcludeRecordsAction(ActionInterface):
id_col = self.ctp_params["id-column"] id_col = self.ctp_params["id-column"]
total_tmp_cols = group_cols + ["DIFF"] total_tmp_cols = group_cols + ["DIFF"]
df3 = df.join(merged_df.select(*total_tmp_cols), group_cols)
df3 = df3.toPandas() df3 = df.merge(merged_df[total_tmp_cols], 'inner', on=group_cols)
df3 = df3.compute()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"] total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
resultado = df3.groupby(group_cols)[total_cols].apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
df3 = df3.sort_values(group_cols + [amount_col])
resultado = df3[total_cols].groupby(group_cols).p_apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
resultado = resultado.reset_index() resultado = resultado.reset_index()
if len(resultado.columns) == 1: if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"]) resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else: else:
resultado.columns = group_cols + ["LISTA_DIFF"] resultado.columns = group_cols + ["LISTA_DIFF"]
meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols) resultado = dd.from_pandas(resultado, npartitions=4)
meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array) meged2 = resultado.merge(merged_df, 'left', group_cols)
meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))]
meged2 = meged2.map_partitions(lambda partition: partition.assign(
LISTA_DIFF=partition['LISTA_DIFF'].apply(lambda x: [] if pd.isna(x) else x)), meta=meged2.dtypes.to_dict())
meged2 = meged2[
(meged2['DIFF'] == 0) |
((meged2['DIFF'] != 0) & meged2['LISTA_DIFF'].apply(
lambda x: True if not pd.isna(x) and ((isinstance(x, List) and len(x) > 0) or (isinstance(x, str) and len(x) > 2)) else False))
]
meged2 = meged2.compute()
if meged2.empty: if meged2.empty:
pass pass
...@@ -207,12 +227,12 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -207,12 +227,12 @@ class MatchAndExcludeRecordsAction(ActionInterface):
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1) meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"}) meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"})
if meged2['INTER_CTP_ID'].dtype == 'int64': if meged2['INTER_CTP_ID'].dtype == 'int64':
merged_df['INTER_CTP_ID'] = merged_df['INTER_CTP_ID'].apply(lambda x: [x]).astype('object') meged2['INTER_CTP_ID'] = meged2['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else: else:
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1) meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"}) meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"})
if meged2['INTER_PIVOT_ID'].dtype == 'int64': if meged2['INTER_PIVOT_ID'].dtype == 'int64':
merged_df['INTER_PIVOT_ID'] = merged_df['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object') meged2['INTER_PIVOT_ID'] = meged2['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object')
return meged2 return meged2
...@@ -224,38 +244,34 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -224,38 +244,34 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def response(self): def response(self):
return self.output return self.output
def add_diff_column(self, partition):
partition['DIFF'] = np.where(partition[self.ctp_params["columns-transaction"][0]].notnull(), 0, np.nan)
return partition
def handle_array(self, x): def handle_array(self, x):
# print(type(x))
if isinstance(x, np.ndarray): if isinstance(x, np.ndarray):
return x return x
else: else:
return [] return []
def array_except(self, arr1, arr2): def array_except(self, arr1, arr2):
# print(arr2)
if arr2 is None: if arr2 is None:
return arr1 return arr1
else: elif not isinstance(arr2, List):
cadena_sin_corchetes = arr2.strip('[]')
partes = cadena_sin_corchetes.split()
# print(partes)
arr2 = [int(numero) for numero in partes]
arr1 = json.loads(arr1.replace(" ", ""))
return [item for item in arr1 if item not in arr2] return [item for item in arr1 if item not in arr2]
def createSession(self, name: str = "app_engine_spark"):
try:
from pyspark.sql import SparkSession
session = SparkSession.builder.master(MASTER) \
.appName(name) \
.config("spark.jars", MYSQL_JAR_PATH) \
.config("spark.executor.extraClassPath", MYSQL_JAR_PATH) \
.config("spark.driver.extraClassPath", MYSQL_JAR_PATH) \
.config("spark.driver.memory", DRIVER_MEMORY) \
.config("spark.executor.memory", EXECUTOR_MEMORY) \
.getOrCreate()
self.app.logger.info(f"Sesión creada exitosamente")
return session
except Exception as e:
raise Exception(f"Error creando sesion Spark. {e}")
def custom_func(group, amount_field, id_field, max_combinations): def custom_func(group, amount_field, id_field, max_combinations):
diff = int(group["DIFF"].values[0]*(10**ROUND_DECIMAL)) diff_value = group["DIFF"].values[0]
if np.isnan(diff_value):
return None
diff = int(diff_value*(10**ROUND_DECIMAL))
if pd.isna(diff) or diff == 0: if pd.isna(diff) or diff == 0:
return None return None
group = group[group[EXCLUDE_ROWS_FIELD] == 'S'] group = group[group[EXCLUDE_ROWS_FIELD] == 'S']
...@@ -273,18 +289,15 @@ def custom_func(group, amount_field, id_field, max_combinations): ...@@ -273,18 +289,15 @@ def custom_func(group, amount_field, id_field, max_combinations):
return indices return indices
@jit(nopython=False)
def subset_sum_iter(numbers, target, num_elements): def subset_sum_iter(numbers, target, num_elements):
# Initialize solutions list # Initialize solutions list
solutions = [] final = typed.List.empty_list(types.int64)
for step in range(1, num_elements+1): for step in range(1, num_elements+1):
# Build first index by taking the first num_elements from the numbers # Build first index by taking the first num_elements from the numbers
indices = list(range(step)) indices = list(range(step))
solution = [numbers[i] for i in indices]
if sum(solution) == target:
solutions.append(solution)
# We iterate over the rest of the indices until we have tried all combinations
while True: while True:
for i in range(step): for i in range(step):
if indices[i] != i + len(numbers) - step: if indices[i] != i + len(numbers) - step:
...@@ -299,13 +312,14 @@ def subset_sum_iter(numbers, target, num_elements): ...@@ -299,13 +312,14 @@ def subset_sum_iter(numbers, target, num_elements):
indices[j] = indices[j - 1] + 1 indices[j] = indices[j - 1] + 1
# Check current solution # Check current solution
solution = [numbers[i] for i in indices] solution = typed.List.empty_list(types.int64)
for i in indices:
solution.append(numbers[i])
if round(sum(solution), ROUND_DECIMAL) == target: if round(sum(solution), ROUND_DECIMAL) == target:
solutions.append(solution) final = solution
break break
if len(solutions) > 0: if len(final) > 0:
solutions = solutions[0]
break break
return solutions return final
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment