Commit 70c88db0 authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer_ca' into 'developer'

Developer ca

See merge request !7
parents 777a777e b9a72588
...@@ -2,9 +2,8 @@ FROM python:3.10-slim-buster ...@@ -2,9 +2,8 @@ FROM python:3.10-slim-buster
ENV FLASK_APP=run.py ENV FLASK_APP=run.py
COPY run.py flask_app.py gunicorn-cfg.py requirements.txt config.py conf.yml / COPY run.py flask_app.py gunicorn-cfg.py requirements.txt config.py /
COPY app app COPY app app
COPY scripts scripts
RUN pip install --upgrade pip RUN pip install --upgrade pip
RUN pip cache purge RUN pip cache purge
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
......
pipeline {
agent any
environment {
IMAGE = "css-engine-python-cusca-bank"
VERSION = "1.0.0"
url = "http://192.168.27.148:5000"
tag_name = "192.168.27.148:5000"
imagename = "${tag_name}/bytesw/css/${IMAGE}"
credentials = "admin-docker-hub"
}
stages {
stage('Deploy Image') {
steps{
script {
docker.withRegistry(url, credentials ) {
sh "docker build -t ${IMAGE}:${VERSION} ."
sh "docker tag ${IMAGE}:${VERSION} ${imagename}:${VERSION}"
sh "docker push ${imagename}:${VERSION}"
sh "docker tag ${imagename}:${VERSION} ${imagename}:latest"
sh "docker push ${imagename}:latest"
}
}
}
}
}
}
...@@ -31,7 +31,7 @@ class Process: ...@@ -31,7 +31,7 @@ class Process:
if isinstance(script_name, type(None)): if isinstance(script_name, type(None)):
raise ModuleNotFoundError(f"Error al descargar script desde volumen") raise ModuleNotFoundError(f"Error al descargar script desde volumen")
path_script = "scripts/"+script_name path_script = "conf/scripts/"+script_name
module = path_script[:-3].replace("/", ".") module = path_script[:-3].replace("/", ".")
import_module = importlib.import_module(module) import_module = importlib.import_module(module)
......
...@@ -3,19 +3,17 @@ from typing import Dict, Any, List ...@@ -3,19 +3,17 @@ from typing import Dict, Any, List
import os import os
import shutil import shutil
from enum import Enum from enum import Enum
# from pyspark.sql import SparkSession
import json import json
from app.main.engine.enum.CodeResponseEnum import CodeResponseEnum from app.main.engine.enum.CodeResponseEnum import CodeResponseEnum
from app.main.engine.util.Timezone import Timezone from app.main.engine.util.Timezone import Timezone
# from config import Config as cfg
from app.main.engine.enum.StatusEnum import StatusEnum from app.main.engine.enum.StatusEnum import StatusEnum
from app.main.engine.enum.SufixEnum import SufixEnum
from app.main.engine.enum.FixedFieldsEnum import FixedFieldsEnum from app.main.engine.enum.FixedFieldsEnum import FixedFieldsEnum
from app.main.engine.enum.DescResponseEnum import DescResponseEnum from app.main.engine.enum.DescResponseEnum import DescResponseEnum
from app.main.engine.models.ResultMode import ResultModel from app.main.engine.models.ResultMode import ResultModel
class Utils: class Utils:
def __init__(self, app) -> None: def __init__(self, app) -> None:
......
...@@ -12,11 +12,13 @@ from app.main.engine.action.ActionInterface import ActionInterface ...@@ -12,11 +12,13 @@ from app.main.engine.action.ActionInterface import ActionInterface
# RELACION DE IDENTIFICADOR DE ACCION Y NOMBRE DE CLASE # RELACION DE IDENTIFICADOR DE ACCION Y NOMBRE DE CLASE
relation_classname_identifier = { relation_classname_identifier = {
"match-and-exclude-records-actions": "MatchAndExcludeRecordsAction" "match-and-exclude-record-actions": "MatchAndExcludeRecordsAction"
} }
# EXCLUDE VALIDATION FIELD # EXCLUDE VALIDATION FIELD
EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID" EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID"
TAKE_ROWS_WITH = "Y"
# REDONDEO DE DECIMALES # REDONDEO DE DECIMALES
ROUND_DECIMAL = 2 ROUND_DECIMAL = 2
...@@ -28,14 +30,16 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -28,14 +30,16 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.identifier = None self.identifier = None
self.max_combinations = None self.max_combinations = None
self.timeout = None self.timeout = None
self.create_tmp_table = None
self.exclude_pivot = None self.exclude_pivot = None
self.pivot_params = None self.pivot_params = None
self.ctp_params = None self.ctp_params = None
self.output = None self.output = None
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"] self.config_params = ["max-record-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot",
"create-tmp-table"]
def parser(self, descriptor: Dict[str, Any]): def parser(self, descriptor: Dict[str, Any]):
self.app.logger.info(f"Descriptor recogido: {descriptor}")
# Validación de parámetros de entrada # Validación de parámetros de entrada
entity_config_params = ["tablename", "id-column", "amount-column", "columns-group", "columns-transaction"] entity_config_params = ["tablename", "id-column", "amount-column", "columns-group", "columns-transaction"]
...@@ -60,9 +64,10 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -60,9 +64,10 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte") raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.identifier = descriptor["idProcess"] self.identifier = descriptor["idProcess"]
self.max_combinations = configs["max-records-per-combinations"] self.max_combinations = configs["max-record-per-combinations"]
self.timeout = int(configs["max-timeout-per-combinations"]) // 1000 # Miliseconds self.timeout = int(configs["max-timeout-per-combinations"]) // 1000 # Miliseconds
self.exclude_pivot = configs["exclude-entity-pivot"] self.exclude_pivot = configs["exclude-entity-pivot"]
self.create_tmp_table = configs["create-tmp-table"]
self.pivot_params = pivot_params self.pivot_params = pivot_params
self.ctp_params = ctp_params self.ctp_params = ctp_params
...@@ -81,12 +86,31 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -81,12 +86,31 @@ class MatchAndExcludeRecordsAction(ActionInterface):
[self.ctp_params["amount-column"], self.ctp_params["id-column"], EXCLUDE_ROWS_FIELD] [self.ctp_params["amount-column"], self.ctp_params["id-column"], EXCLUDE_ROWS_FIELD]
total_ctp_cols = list(set(total_ctp_cols)) total_ctp_cols = list(set(total_ctp_cols))
pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"], pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"],
npartitions=mp.cpu_count(), columns=total_pivot_cols) npartitions=mp.cpu_count()*2, columns=total_pivot_cols)
pivot_df = pivot_df.reset_index() pivot_df = pivot_df.reset_index()
ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"], ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"],
npartitions=mp.cpu_count(), columns=total_ctp_cols) npartitions=mp.cpu_count()*2, columns=total_ctp_cols)
ctp_df = ctp_df.reset_index() ctp_df = ctp_df.reset_index()
self.app.logger.debug(f"Insumos cargados - {self.identifier}")
# Forzamos a que los datos que se restaran (usualmente el monto) sea númerico
pivot_df[self.pivot_params["amount-column"]] = pivot_df[self.pivot_params["amount-column"]].astype(float).round(ROUND_DECIMAL)
ctp_df[self.ctp_params["amount-column"]] = ctp_df[self.ctp_params["amount-column"]].astype(float).round(ROUND_DECIMAL)
self.app.logger.info(f"Insumos cargados - {self.identifier}")
if self.create_tmp_table:
from app.main.engine.util.Timezone import Timezone
from sqlalchemy import text
# Creamos las tablas temporales del pivot y contraparte con sufijo N° aleatorio de 6 dígitos
timezone = Timezone(self.app)
current_time = timezone.datetime_by_tzone().strftime("%Y%m%d%H%M")
pivot_query = f"CREATE TABLE ENGINE_{pivot_table}_{current_time} AS SELECT * FROM {pivot_table}"
ctp_query = f"CREATE TABLE ENGINE_{ctp_table}_{current_time} AS SELECT * FROM {ctp_table}"
source_obj.create_engine()
engine = source_obj.engine
with engine.connect() as conn:
conn.execute(text(pivot_query))
conn.execute(text(ctp_query))
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input # Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_' # pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
# Iterar sobre las columnas del DataFrame # Iterar sobre las columnas del DataFrame
...@@ -119,7 +143,6 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -119,7 +143,6 @@ class MatchAndExcludeRecordsAction(ActionInterface):
ctp_cols.remove(self.ctp_params["amount-column"]) ctp_cols.remove(self.ctp_params["amount-column"])
max_combinations = self.max_combinations max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros # Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0: if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado") raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
...@@ -173,23 +196,24 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -173,23 +196,24 @@ class MatchAndExcludeRecordsAction(ActionInterface):
merged = total_merged.merge(ctp_df2, 'inner', left_on=pivot_cols, right_on=ctp_cols) merged = total_merged.merge(ctp_df2, 'inner', left_on=pivot_cols, right_on=ctp_cols)
if self.exclude_pivot:
merged['DIFF'] = merged['DIFF'].where(merged['DIFF'].notnull(), merged['DIFF'] = merged['DIFF'].where(merged['DIFF'].notnull(),
merged[self.pivot_params["amount-column"]] - merged[ merged[self.pivot_params["amount-column"]] - merged[
self.ctp_params["amount-column"]]) self.ctp_params["amount-column"]])
else:
merged['DIFF'] = merged['DIFF'].where(merged['DIFF'].notnull(),
merged[self.ctp_params["amount-column"]] - merged[
self.pivot_params["amount-column"]])
merged = merged.dropna(subset=["DIFF"]) merged = merged.dropna(subset=["DIFF"])
if merged.known_divisions: if merged.known_divisions:
return pd.DataFrame([]) return pd.DataFrame([])
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0: if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
merged = merged.set_index(self.pivot_params["id-column"]) merged = merged.sort_values(self.pivot_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.pivot_params["id-column"]]))
merged = merged.drop_duplicates(subset=pivot_cols) merged = merged.drop_duplicates(subset=pivot_cols)
merged = merged.reset_index()
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0: elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
merged = merged.set_index(self.ctp_params["id-column"]) merged = merged.sort_values(self.ctp_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.ctp_params["id-column"]]))
merged = merged.drop_duplicates(subset=ctp_cols) merged = merged.drop_duplicates(subset=ctp_cols)
merged = merged.reset_index()
merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL)) merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL))
if self.exclude_pivot: if self.exclude_pivot:
...@@ -206,16 +230,16 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -206,16 +230,16 @@ class MatchAndExcludeRecordsAction(ActionInterface):
id_col = self.ctp_params["id-column"] id_col = self.ctp_params["id-column"]
total_tmp_cols = group_cols + ["DIFF"] total_tmp_cols = group_cols + ["DIFF"]
df3 = df.merge(merged_df[total_tmp_cols], 'inner', on=group_cols) df3 = df.merge(merged_df[total_tmp_cols], 'inner', on=group_cols)
# Filtro de exclusión de registros con validación # Filtro de exclusión de registros con validación
df3 = df3[df3[EXCLUDE_ROWS_FIELD] == 'S'] df3 = df3[df3[EXCLUDE_ROWS_FIELD] == TAKE_ROWS_WITH]
df3 = df3.drop(EXCLUDE_ROWS_FIELD, axis=1)
df3 = df3.compute() df3 = df3.compute()
total_cols = group_cols + [amount_col, id_col, "DIFF"]
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
self.app.logger.info(f"Aplicando algoritmo de exclusión. Máximo de {str(max_combinations)} combinaciones - {self.identifier}") self.app.logger.info(f"Aplicando algoritmo de exclusión. Máximo de {str(max_combinations)} combinaciones - {self.identifier}")
resultado = df3.groupby(group_cols)[total_cols].apply(lambda x: custom_func(x, amount_col, id_col, max_combinations)) resultado = df3.groupby(group_cols)[total_cols].apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
if resultado.empty:
return pd.DataFrame([])
resultado = resultado.reset_index() resultado = resultado.reset_index()
if len(resultado.columns) == 1: if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"]) resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
...@@ -234,14 +258,16 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -234,14 +258,16 @@ class MatchAndExcludeRecordsAction(ActionInterface):
meged2 = meged2.compute() meged2 = meged2.compute()
if meged2.empty: if meged2.empty:
pass self.app.logger.info(f"El dataset resultante esta vacío luego de la ejecución del script")
elif self.exclude_pivot: elif self.exclude_pivot:
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1) meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2[meged2["INTER_PIVOT_ID"].apply(lambda x: True if isinstance(x, List) and len(x) > 0 else False)]
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"}) meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"})
if meged2['INTER_CTP_ID'].dtype == 'int64': if meged2['INTER_CTP_ID'].dtype == 'int64':
meged2['INTER_CTP_ID'] = meged2['INTER_CTP_ID'].apply(lambda x: [x]).astype('object') meged2['INTER_CTP_ID'] = meged2['INTER_CTP_ID'].apply(lambda x: [x]).astype('object')
else: else:
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1) meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1)
meged2 = meged2[meged2['INTER_CTP_ID'].apply(lambda x: True if isinstance(x, List) and len(x) > 0 else False)]
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"}) meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"})
if meged2['INTER_PIVOT_ID'].dtype == 'int64': if meged2['INTER_PIVOT_ID'].dtype == 'int64':
meged2['INTER_PIVOT_ID'] = meged2['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object') meged2['INTER_PIVOT_ID'] = meged2['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object')
......
...@@ -2,7 +2,7 @@ import os ...@@ -2,7 +2,7 @@ import os
from decouple import config from decouple import config
import yaml import yaml
conf = yaml.safe_load(open('conf.yml')) conf = yaml.safe_load(open('conf/conf.yml'))
conf = conf["app"] conf = conf["app"]
......
...@@ -16,8 +16,7 @@ services: ...@@ -16,8 +16,7 @@ services:
ports: ports:
- "9500:8000" - "9500:8000"
volumes: volumes:
- "./conf.yml:/conf.yml" - "./conf:/conf"
- "./scripts/match-and-exclude-records-actions_v1.py:/scripts/match-and-exclude-records-actions_v1.py"
networks: networks:
css-cusca-network: css-cusca-network:
......
...@@ -3,7 +3,7 @@ Copyright (c) 2019 - present AppSeed.us ...@@ -3,7 +3,7 @@ Copyright (c) 2019 - present AppSeed.us
""" """
import yaml import yaml
conf = yaml.safe_load(open('conf.yml')) conf = yaml.safe_load(open('conf/conf.yml'))
conf = conf["app"]["gunicorn"] conf = conf["app"]["gunicorn"]
bind = conf["bind"] bind = conf["bind"]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment