Commit 03328797 authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1-dask

parent c2d602b8
......@@ -25,7 +25,7 @@ class Process:
db_params = cfg.db_params
source = Database(self.app, db_params)
db_session = source.get_session()
print("1")
# Obteniendo el nombre del script
script_name = source.get_action_by_identifier(self.descriptor["idScript"], db_session)
if isinstance(script_name, type(None)):
......@@ -47,11 +47,11 @@ class Process:
# Iniciando process
self.app.logger.info(f"Iniciando procesamiento de script")
obj_script.process(source)
print("1")
# Guardando resultado
self.app.logger.info(f"Generado y guardando resultado")
response = obj_script.response()
# response.show()
result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
......
import gzip
from typing import Dict, Any
import boto3
class EMRServerless:
"""
An example implementation of running a PySpark job on EMR Serverless.
This class provides support for creating an EMR Serverless Spark application, running a job,
fetching driver logs, and shutting the application back down.
By default, all calls are synchronous in that they wait for the Application to reach the desired state.
- `create_application` waits for the application to reach the `CREATED` state.
- `start_application` waits for the `STARTED` state.
- `stop_application` waits for the `STOPPED state.
- `run_spark_job` waits until the job is in a terminal state.
"""
def __init__(self, application_id: str = None, search_app: bool = False) -> None:
self.application_id = application_id
self.s3_log_prefix = "emr-serverless-logs"
self.app_type = "SPARK" # EMR Serverless also supports jobs of type 'HIVE'
self.client = boto3.client("emr-serverless")
self.search_app = search_app
def __str__(self):
return f"EMR Serverless {self.app_type} Application: {self.application_id}"
def valid_application(self) -> Dict[str, Any]:
"""
Valid if an application is created or started and get it
:return:
"""
response = {"exists": False}
if self.search_app:
applications = self.client.list_applications()["applications"]
print(applications)
if len(applications) > 0:
response["exists"] = True
application = applications[0]
application = application["id"]
response["app"] = application
return response
def create_application(self, name: str, release_label: str, args: dict, wait: bool = True):
"""
Create a new application with the provided name and release_label - the application needs to be started after.
"""
if self.application_id is not None:
raise Exception(
f"Application already created (application_id: `{self.application_id}`)"
)
initial_capacity = args["initial_capacity"]
maximum_capacity = args["maximun_capacity"]
networkConfiguration = args["networkConfiguration"]
imageConfiguration = args["imageConfiguration"]
response = self.client.create_application(
name=name, releaseLabel=release_label, type=self.app_type,
initialCapacity=initial_capacity, maximumCapacity=maximum_capacity,
networkConfiguration=networkConfiguration, imageConfiguration=imageConfiguration
)
self.application_id = response.get("applicationId")
app_ready = False
while wait and not app_ready:
response = self.client.get_application(applicationId=self.application_id)
app_ready = response.get("application").get("state") == "CREATED"
def start_application(self, wait: bool = True) -> None:
"""
Start the application - by default, wait until the application is started.
"""
if self.application_id is None:
raise Exception(
"No application_id - please use creation_application first."
)
self.client.start_application(applicationId=self.application_id)
app_started = False
while wait and not app_started:
response = self.client.get_application(applicationId=self.application_id)
app_started = response.get("application").get("state") == "STARTED"
def stop_application(self, wait: bool = True) -> None:
"""
Stop the application - by default, wait until the application is stopped.
"""
self.client.stop_application(applicationId=self.application_id)
app_stopped = False
while wait and not app_stopped:
response = self.client.get_application(applicationId=self.application_id)
app_stopped = response.get("application").get("state") == "STOPPED"
def delete_application(self) -> None:
"""
Delete the application - it must be stopped first.
"""
self.client.delete_application(applicationId=self.application_id)
def run_spark_job(
self,
script_location: str,
job_role_arn: str,
arguments: [],
sparkArguments: [],
s3_bucket_name: str,
wait: bool = True,
) -> str:
"""
Runs the Spark job identified by `script_location`. Arguments can also be provided via the `arguments` parameter.
By default, spark-submit parameters are hard-coded and logs are sent to the provided s3_bucket_name.
This method is blocking by default until the job is complete.
"""
spark_args = "--conf spark.driver.cores="+str(sparkArguments["driver-cores"])
spark_args += " --conf spark.driver.memory="+str(sparkArguments["driver-memory"])
spark_args += " --conf spark.executor.cores="+str(sparkArguments["executor-cores"])
spark_args += " --conf spark.executor.memory="+str(sparkArguments["executor-memory"])
spark_args += " --conf spark.executor.instances="+str(sparkArguments["executor-instances"])
spark_args += " " + sparkArguments["others"]
response = self.client.start_job_run(
applicationId=self.application_id,
executionRoleArn=job_role_arn,
jobDriver={
"sparkSubmit": {
"entryPoint": script_location,
"entryPointArguments": arguments,
"sparkSubmitParameters": spark_args,
}
},
configurationOverrides={
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": f"s3://{s3_bucket_name}/{self.s3_log_prefix}"
}
}
},
)
job_run_id = response.get("jobRunId")
job_done = False
while wait and not job_done:
jr_response = self.get_job_run(job_run_id)
job_done = jr_response.get("state") in [
"SUCCESS",
"FAILED",
"CANCELLING",
"CANCELLED",
]
return job_run_id
def get_job_run(self, job_run_id: str) -> dict:
response = self.client.get_job_run(
applicationId=self.application_id, jobRunId=job_run_id
)
return response.get("jobRun")
def fetch_driver_log(
self, s3_bucket_name: str, job_run_id: str, log_type: str = "stdout"
) -> str:
"""
Access the specified `log_type` Driver log on S3 and return the full log string.
"""
s3_client = boto3.client("s3")
file_location = f"{self.s3_log_prefix}/applications/{self.application_id}/jobs/{job_run_id}/SPARK_DRIVER/{log_type}.gz"
try:
response = s3_client.get_object(Bucket=s3_bucket_name, Key=file_location)
file_content = gzip.decompress(response["Body"].read()).decode("utf-8")
except s3_client.exceptions.NoSuchKey:
file_content = ""
return str(file_content)
......@@ -32,7 +32,7 @@ app:
bind: '0.0.0.0:8000'
worker_class: 'gthread'
threads: 51
worker_connections: 51
worker_connections: 100
loglevel: 'debug'
accesslog: '-'
capture_output: True
\ No newline at end of file
version: "3"
services:
css-engine-scripts:
image: css-cuscatlan-engine:0.0.1
container_name: css-cusca-scripts
css-cusca-scripts:
image: css_cuscatlan:0.0.2
container_name: css-cusca
deploy:
resources:
limits:
......@@ -14,9 +14,10 @@ services:
restart: always
networks: [ css-cusca-network ]
ports:
- "9500:7500"
- "9500:8000"
volumes:
- "./conf.yml:/conf.yml"
- "./scripts/match-and-exclude-records-actions_v1.py:/scripts/match-and-exclude-records-actions_v1.py"
networks:
css-cusca-network:
......
......@@ -5,6 +5,7 @@ click==8.1.7
cloudpickle==3.0.0
dask==2024.1.1
dill==0.3.8
dpss==0.22.0
Flask==3.0.3
fsspec==2024.3.1
greenlet==3.0.3
......@@ -13,14 +14,12 @@ importlib_metadata==7.1.0
itsdangerous==2.2.0
Jinja2==3.1.4
lib-detect-testenv==2.0.8
llvmlite==0.42.0
locket==1.0.0
MarkupSafe==2.1.5
multiprocess==0.70.16
numba==0.59.1
numpy==1.26.4
packaging==24.0
pandas==2.2.1
pandas==2.2.2
partd==1.4.2
pillow==10.3.0
psutil==5.9.8
......
......@@ -2,17 +2,25 @@ from typing import Any, Dict, List
import numpy as np
import pandas as pd
import json
import os
import subprocess
import uuid
import time
from dpss import find_subset
from dask import dataframe as dd
from numba import jit, types, typed
from wrapt_timeout_decorator import timeout
import multiprocessing as mp
from app.main.engine.action.ActionInterface import ActionInterface
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Crea un controlador para imprimir en la consola
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# RELACION DE IDENTIFICADOR DE ACCION Y NOMBRE DE CLASE
relation_classname_identifier = {
"match-and-exclude-records-actions": "MatchAndExcludeRecordsAction"
......@@ -81,7 +89,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"],
npartitions=mp.cpu_count())
ctp_df = ctp_df.reset_index()
logger.debug(f"Insumos cargados")
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
# Iterar sobre las columnas del DataFrame
......@@ -121,6 +129,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
logger.info("Ejecutando Caso 1 - Muchos")
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({
self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades
self.ctp_params["id-column"]: list
......@@ -131,6 +140,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
logger.info("Ejecutando Caso Muchos - 1")
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
self.pivot_params["amount-column"]: 'sum',
self.pivot_params["id-column"]: list
......@@ -141,6 +151,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
logger.info("Ejecutando Caso Muchos - Muchos")
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
self.pivot_params["amount-column"]: 'sum',
self.pivot_params["id-column"]: list
......@@ -169,10 +180,14 @@ class MatchAndExcludeRecordsAction(ActionInterface):
merged[self.pivot_params["amount-column"]] - merged[
self.ctp_params["amount-column"]])
merged = merged.dropna(subset=["DIFF"])
if merged.known_divisions:
return pd.DataFrame([])
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
merged = merged.set_index(self.pivot_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.pivot_params["id-column"]]))
merged = merged.drop_duplicates(subset=pivot_cols)
merged = merged.reset_index()
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
merged = merged.set_index(self.ctp_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.ctp_params["id-column"]]))
......@@ -181,11 +196,13 @@ class MatchAndExcludeRecordsAction(ActionInterface):
merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL))
if self.exclude_pivot:
logger.info("Se excluiran registros del pivot")
df = pivot_df
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
logger.info("Se excluiran registros de la contraparte")
df = ctp_df
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
......@@ -196,6 +213,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
df3 = df.merge(merged_df[total_tmp_cols], 'inner', on=group_cols)
df3 = df3.compute()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
logger.info(f"Aplicando algoritmo de exclusión. Máximo de {str(max_combinations)} combinaciones")
resultado = df3.groupby(group_cols)[total_cols].apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
resultado = resultado.reset_index()
......@@ -234,8 +252,9 @@ class MatchAndExcludeRecordsAction(ActionInterface):
except TimeoutError as e:
raise TimeoutError(f"Tiempo límite superado. {e}")
start_time = time.time()
self.output = __process(source_obs)
logger.info(f"Ejecución de script terminado en {time.time() - start_time} segundos")
def response(self):
return self.output
......@@ -256,7 +275,6 @@ class MatchAndExcludeRecordsAction(ActionInterface):
elif not isinstance(arr2, List):
cadena_sin_corchetes = arr2.replace(" ", "").strip('[]')
partes = cadena_sin_corchetes.split(",")
# print(partes)
arr2 = [int(numero) for numero in partes]
arr1 = json.loads(arr1.replace(" ", ""))
return [item for item in arr1 if item not in arr2]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment