Commit 9213ca48 authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1-dask

parent c1597525
FROM python:3.10-slim-buster
ENV FLASK_APP=run.py
COPY run.py flask_app.py gunicorn-cfg.py requirements.txt config.py conf.yml /
COPY app app
COPY scripts scripts
RUN pip install --upgrade pip
RUN pip cache purge
RUN pip install -r requirements.txt
CMD ["gunicorn", "--config", "gunicorn-cfg.py", "run:app"]
version: "3"
services:
css-engine-scripts:
image: css-cuscatlan-engine:0.0.1
container_name: css-cusca-scripts
deploy:
resources:
limits:
cpus: '8'
memory: 16G
reservations:
cpus: '4'
memory: 8G
restart: always
networks: [ css-cusca-network ]
ports:
- "9500:7500"
volumes:
- "./conf.yml:/conf.yml"
networks:
css-cusca-network:
driver: bridge
"""
Copyright (c) 2019 - present AppSeed.us
"""
import yaml
conf = yaml.safe_load(open('conf.yml'))
conf = conf["app"]["gunicorn"]
bind = conf["bind"]
worker_class = conf["worker_class"]
threads = conf["threads"]
worker_connections = conf["worker_connections"]
loglevel = conf["loglevel"]
accesslog = conf["accesslog"]
capture_output = conf["capture_output"]
arrow==1.3.0
blinker==1.8.2
cli-exit-tools==1.2.6
click==8.1.7
cloudpickle==3.0.0
dask==2024.1.1
dill==0.3.8
Flask==3.0.3
fsspec==2024.3.1
greenlet==3.0.3
gunicorn==22.0.0
importlib_metadata==7.1.0
itsdangerous==2.2.0
Jinja2==3.1.4
lib-detect-testenv==2.0.8
llvmlite==0.42.0
locket==1.0.0
MarkupSafe==2.1.5
multiprocess==0.70.16
numba==0.59.1
numpy==1.26.4
packaging==24.0
pandas==2.2.1
partd==1.4.2
pillow==10.3.0
psutil==5.9.8
pyarrow==14.0.2
PyMySQL==1.1.0
python-dateutil==2.9.0.post0
python-decouple==3.8
pytz==2024.1
PyYAML==6.0.1
six==1.16.0
SQLAlchemy==2.0.30
toolz==0.12.1
types-python-dateutil==2.9.0.20240316
typing_extensions==4.11.0
tzdata==2024.1
Werkzeug==3.0.3
wrapt==1.16.0
wrapt_timeout_decorator==1.5.1
zipp==3.18.1
from typing import Any, Dict, List from typing import Any, Dict, List
import importlib.util
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import json import json
from dask import dataframe as dd from dask import dataframe as dd
from numba import jit, types, typed from numba import jit, types, typed
from wrapt_timeout_decorator import timeout from wrapt_timeout_decorator import timeout
import multiprocessing as mp
from app.main.engine.action.ActionInterface import ActionInterface from app.main.engine.action.ActionInterface import ActionInterface
...@@ -22,9 +22,6 @@ ROUND_DECIMAL = 2 ...@@ -22,9 +22,6 @@ ROUND_DECIMAL = 2
class MatchAndExcludeRecordsAction(ActionInterface): class MatchAndExcludeRecordsAction(ActionInterface):
library_required = "pyspark"
version_required = "3.4.0"
def __init__(self, app) -> None: def __init__(self, app) -> None:
super().__init__(app) super().__init__(app)
self.max_combinations = None self.max_combinations = None
...@@ -36,14 +33,6 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -36,14 +33,6 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"] self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]): def parser(self, descriptor: Dict[str, Any]):
# Validar si pyspark y su versión está instalada
pyspark_lib = importlib.util.find_spec(self.library_required)
if pyspark_lib is None:
raise ImportError(f"Librería requerida *{self.library_required}* no instalada")
import pyspark
version = pyspark.__version__
if version != self.version_required:
raise ImportError(f"Versión requerida no instalada. Requerida: {self.version_required}. Instalada: {version}")
# Validación de parámetros de entrada # Validación de parámetros de entrada
entity_config_params = ["tablename", "id-column", "amount-column", "columns-group", "columns-transaction"] entity_config_params = ["tablename", "id-column", "amount-column", "columns-group", "columns-transaction"]
...@@ -83,9 +72,10 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -83,9 +72,10 @@ class MatchAndExcludeRecordsAction(ActionInterface):
dialect = source_obj.get_dialect() dialect = source_obj.get_dialect()
pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"], pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"],
npartitions=4) npartitions=mp.cpu_count())
pivot_df = pivot_df.reset_index() pivot_df = pivot_df.reset_index()
ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"], npartitions=4) ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"],
npartitions=mp.cpu_count())
ctp_df = ctp_df.reset_index() ctp_df = ctp_df.reset_index()
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input # Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
...@@ -111,8 +101,6 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -111,8 +101,6 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]] self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]] self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
from pyspark.sql.functions import sum, collect_list, round, when, col, lit
pivot_cols = self.pivot_params["columns-transaction"].copy() pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols: if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"]) pivot_cols.remove(self.pivot_params["amount-column"])
...@@ -178,10 +166,14 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -178,10 +166,14 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.ctp_params["amount-column"]]) self.ctp_params["amount-column"]])
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0: if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
merged = merged.set_index(self.pivot_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.pivot_params["id-column"]]))
merged = merged.drop_duplicates(subset=pivot_cols) merged = merged.drop_duplicates(subset=pivot_cols)
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0: elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
merged = merged.set_index(self.ctp_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.ctp_params["id-column"]]))
merged = merged.drop_duplicates(subset=ctp_cols) merged = merged.drop_duplicates(subset=ctp_cols)
merged = merged.reset_index()
merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL)) merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL))
if self.exclude_pivot: if self.exclude_pivot:
...@@ -207,7 +199,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -207,7 +199,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"]) resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else: else:
resultado.columns = group_cols + ["LISTA_DIFF"] resultado.columns = group_cols + ["LISTA_DIFF"]
resultado = dd.from_pandas(resultado, npartitions=4) resultado = dd.from_pandas(resultado, npartitions=mp.cpu_count())
meged2 = resultado.merge(merged_df, 'left', group_cols) meged2 = resultado.merge(merged_df, 'left', group_cols)
...@@ -255,13 +247,11 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -255,13 +247,11 @@ class MatchAndExcludeRecordsAction(ActionInterface):
return [] return []
def array_except(self, arr1, arr2): def array_except(self, arr1, arr2):
# print(arr2)
if arr2 is None: if arr2 is None:
return arr1 return arr1
elif not isinstance(arr2, List): elif not isinstance(arr2, List):
cadena_sin_corchetes = arr2.strip('[]') cadena_sin_corchetes = arr2.strip('[]')
partes = cadena_sin_corchetes.split() partes = cadena_sin_corchetes.split()
# print(partes)
arr2 = [int(numero) for numero in partes] arr2 = [int(numero) for numero in partes]
arr1 = json.loads(arr1.replace(" ", "")) arr1 = json.loads(arr1.replace(" ", ""))
return [item for item in arr1 if item not in arr2] return [item for item in arr1 if item not in arr2]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment