Commit 7086b4fa authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1-emr

parent 9213ca48
...@@ -14,7 +14,7 @@ class ActionInterface(ABC): ...@@ -14,7 +14,7 @@ class ActionInterface(ABC):
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def process(self, source_obj): def process(self, source_obj, script_name, timezone, pattern):
"""Método que ejecuta la lógica del script""" """Método que ejecuta la lógica del script"""
raise NotImplementedError raise NotImplementedError
......
...@@ -28,7 +28,7 @@ class Mysql: ...@@ -28,7 +28,7 @@ class Mysql:
def create_spark_connection(self): def create_spark_connection(self):
params = {} params = {}
try: try:
url = "jdbc:mysql://"+self.host+":"+str(self.port)+"/"+self.database url = "jdbc:mysql://"+self.user+":"+self.password+"@"+self.host+":"+str(self.port)+"/"+self.database
properties = {"user": self.user, "password": self.password, "driver": "com.mysql.cj.jdbc.Driver"} properties = {"user": self.user, "password": self.password, "driver": "com.mysql.cj.jdbc.Driver"}
params["url"] = url params["url"] = url
params["properties"] = properties params["properties"] = properties
......
from typing import Dict, Any from typing import Dict, Any
import time
import traceback as traceback_lib import traceback as traceback_lib
import importlib import importlib
...@@ -26,7 +25,7 @@ class Process: ...@@ -26,7 +25,7 @@ class Process:
db_params = cfg.db_params db_params = cfg.db_params
source = Database(self.app, db_params) source = Database(self.app, db_params)
db_session = source.get_session() db_session = source.get_session()
print("1")
# Obteniendo el nombre del script # Obteniendo el nombre del script
script_name = source.get_action_by_identifier(self.descriptor["idScript"], db_session) script_name = source.get_action_by_identifier(self.descriptor["idScript"], db_session)
if isinstance(script_name, type(None)): if isinstance(script_name, type(None)):
...@@ -47,16 +46,16 @@ class Process: ...@@ -47,16 +46,16 @@ class Process:
# Iniciando process # Iniciando process
self.app.logger.info(f"Iniciando procesamiento de script") self.app.logger.info(f"Iniciando procesamiento de script")
obj_script.process(source) obj_script.process(source, script_name, cfg.timezone, cfg.time_pattern)
print("1")
# Guardando resultado # Guardando resultado
self.app.logger.info(f"Generado y guardando resultado") self.app.logger.info(f"Generado y guardando resultado")
response = obj_script.response() # _ = obj_script.response()
# response.show() # response.show()
result = self.utils.create_result(response, self.descriptor) # result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session) # save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name: # if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"]) # raise InterruptedError(save["message"])
except TimeoutError as e: except TimeoutError as e:
self.app.logger.error(f"Error de Timeout. Error: {e}") self.app.logger.error(f"Error de Timeout. Error: {e}")
status, status_description = CodeResponseEnum.TIMEOUT, str(e) status, status_description = CodeResponseEnum.TIMEOUT, str(e)
......
import gzip
from typing import Dict, Any
import boto3
class EMRServerless:
"""
An example implementation of running a PySpark job on EMR Serverless.
This class provides support for creating an EMR Serverless Spark application, running a job,
fetching driver logs, and shutting the application back down.
By default, all calls are synchronous in that they wait for the Application to reach the desired state.
- `create_application` waits for the application to reach the `CREATED` state.
- `start_application` waits for the `STARTED` state.
- `stop_application` waits for the `STOPPED state.
- `run_spark_job` waits until the job is in a terminal state.
"""
def __init__(self, application_id: str = None, search_app: bool = False) -> None:
self.application_id = application_id
self.s3_log_prefix = "emr-serverless-logs"
self.app_type = "SPARK" # EMR Serverless also supports jobs of type 'HIVE'
self.client = boto3.client("emr-serverless")
self.search_app = search_app
def __str__(self):
return f"EMR Serverless {self.app_type} Application: {self.application_id}"
def valid_application(self) -> Dict[str, Any]:
"""
Valid if an application is created or started and get it
:return:
"""
response = {"exists": False}
if self.search_app:
applications = self.client.list_applications()["applications"]
print(applications)
if len(applications) > 0:
response["exists"] = True
application = applications[0]
application = application["id"]
response["app"] = application
return response
def create_application(self, name: str, release_label: str, args: dict, wait: bool = True):
"""
Create a new application with the provided name and release_label - the application needs to be started after.
"""
if self.application_id is not None:
raise Exception(
f"Application already created (application_id: `{self.application_id}`)"
)
initial_capacity = args["initial_capacity"]
maximum_capacity = args["maximun_capacity"]
networkConfiguration = args["networkConfiguration"]
imageConfiguration = args["imageConfiguration"]
response = self.client.create_application(
name=name, releaseLabel=release_label, type=self.app_type,
initialCapacity=initial_capacity, maximumCapacity=maximum_capacity,
networkConfiguration=networkConfiguration, imageConfiguration=imageConfiguration
)
self.application_id = response.get("applicationId")
app_ready = False
while wait and not app_ready:
response = self.client.get_application(applicationId=self.application_id)
app_ready = response.get("application").get("state") == "CREATED"
def start_application(self, wait: bool = True) -> None:
"""
Start the application - by default, wait until the application is started.
"""
if self.application_id is None:
raise Exception(
"No application_id - please use creation_application first."
)
self.client.start_application(applicationId=self.application_id)
app_started = False
while wait and not app_started:
response = self.client.get_application(applicationId=self.application_id)
app_started = response.get("application").get("state") == "STARTED"
def stop_application(self, wait: bool = True) -> None:
"""
Stop the application - by default, wait until the application is stopped.
"""
self.client.stop_application(applicationId=self.application_id)
app_stopped = False
while wait and not app_stopped:
response = self.client.get_application(applicationId=self.application_id)
app_stopped = response.get("application").get("state") == "STOPPED"
def delete_application(self) -> None:
"""
Delete the application - it must be stopped first.
"""
self.client.delete_application(applicationId=self.application_id)
def run_spark_job(
self,
script_location: str,
job_role_arn: str,
arguments: [],
sparkArguments: [],
s3_bucket_name: str,
wait: bool = True,
) -> str:
"""
Runs the Spark job identified by `script_location`. Arguments can also be provided via the `arguments` parameter.
By default, spark-submit parameters are hard-coded and logs are sent to the provided s3_bucket_name.
This method is blocking by default until the job is complete.
"""
spark_args = "--conf spark.driver.cores="+str(sparkArguments["driver-cores"])
spark_args += " --conf spark.driver.memory="+str(sparkArguments["driver-memory"])
spark_args += " --conf spark.executor.cores="+str(sparkArguments["executor-cores"])
spark_args += " --conf spark.executor.memory="+str(sparkArguments["executor-memory"])
spark_args += " --conf spark.executor.instances="+str(sparkArguments["executor-instances"])
spark_args += " " + sparkArguments["others"]
response = self.client.start_job_run(
applicationId=self.application_id,
executionRoleArn=job_role_arn,
jobDriver={
"sparkSubmit": {
"entryPoint": script_location,
"entryPointArguments": arguments,
"sparkSubmitParameters": spark_args,
}
},
configurationOverrides={
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": f"s3://{s3_bucket_name}/{self.s3_log_prefix}"
}
}
},
)
job_run_id = response.get("jobRunId")
job_done = False
while wait and not job_done:
jr_response = self.get_job_run(job_run_id)
job_done = jr_response.get("state") in [
"SUCCESS",
"FAILED",
"CANCELLING",
"CANCELLED",
]
return job_run_id
def get_job_run(self, job_run_id: str) -> dict:
response = self.client.get_job_run(
applicationId=self.application_id, jobRunId=job_run_id
)
return response.get("jobRun")
def fetch_driver_log(
self, s3_bucket_name: str, job_run_id: str, log_type: str = "stdout"
) -> str:
"""
Access the specified `log_type` Driver log on S3 and return the full log string.
"""
s3_client = boto3.client("s3")
file_location = f"{self.s3_log_prefix}/applications/{self.application_id}/jobs/{job_run_id}/SPARK_DRIVER/{log_type}.gz"
try:
response = s3_client.get_object(Bucket=s3_bucket_name, Key=file_location)
file_content = gzip.decompress(response["Body"].read()).decode("utf-8")
except s3_client.exceptions.NoSuchKey:
file_content = ""
return str(file_content)
FROM public.ecr.aws/emr-serverless/spark/emr-7.0.0:latest
USER root
# install python 3
RUN yum install -y gcc openssl-devel bzip2-devel libffi-devel tar gzip wget make zlib-devel
RUN wget https://www.python.org/ftp/python/3.10.0/Python-3.10.0.tgz && \
tar xzf Python-3.10.0.tgz && cd Python-3.10.0 && \
./configure --enable-optimizations && \
make altinstall
COPY subset_sum_linux /tmp/
COPY requirements.txt /
RUN python3 -m pip install numpy pandas py4j python-dateutil pytz six tzdata
# EMRS will run the image as hadoop
USER hadoop:hadoop
\ No newline at end of file
...@@ -6,4 +6,4 @@ base = MainApplication() ...@@ -6,4 +6,4 @@ base = MainApplication()
app = base.create_app() app = base.create_app()
if __name__ == "__main__": if __name__ == "__main__":
base.run(port=8000) base.run(port=7500)
import os
import uuid
from typing import Dict, Any, List
import sys
import subprocess
import pandas as pd
import time
from datetime import datetime
from dateutil.parser import parse
import json
import pytz
import logging
from enum import Enum
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, collect_list, round, when, col, lit, size, udf, array_except, array
from pyspark.sql.types import ArrayType, IntegerType, LongType
class FixedFieldsEnum(Enum):
INTER_PIVOT_ID = "INTER_PIVOT_ID"
INTER_CTP_ID = "INTER_CTP_ID"
LISTA_DIFF = "LISTA_DIFF"
DIFF = "DIFF"
MATCH_RECORDS = "match-records"
class StatusEnum(Enum):
OK = 200
ERROR = 609
TIMEOUT = 610
logger = logging.getLogger(__name__)
# EXCLUDE VALIDATION FIELD
EXCLUDE_ROWS_FIELD = "EXCLUDE_VALID"
# REDONDEO DE DECIMALES
ROUND_DECIMAL = 2
# COLUMNAS TABLA RESULTADO
RESULT_TABLENAME = "CSS_RESULT_BY_ACTION"
RESULT_TABLE_FIELDS = ["ACTION_ID", "ID_PROCESS", "CREATE_DATE", "KEY", "RESULT_JSON"]
def process() -> Dict[str, Any]:
response = {"status": StatusEnum.ERROR.name}
start_time = time.time()
params = sys.argv
descriptor = params[1]
jdbc_url = params[2]
timezone = params[3]
pattern = params[4]
descriptor = json.loads(descriptor)
session = createSession()
configs = descriptor["config-params"]
exclude_pivot = configs["exclude-entity-pivot"]
max_combinations = configs["max-records-per-combinations"]
params_input = descriptor["params-input"]
pivot_params, ctp_params = params_input["pivot-config"], params_input["counterpart-config"]
pivot_table, ctp_table = pivot_params["tablename"], ctp_params["tablename"]
jdbc_properties = {"driver": "com.mysql.jdbc.Driver"}
pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties)
ctp_df = session.read.jdbc(url=jdbc_url, table=ctp_table, properties=jdbc_properties)
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
for column in pivot_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
pivot_df = pivot_df.withColumnRenamed(column, "PIVOT_" + column)
for column in ctp_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
ctp_df = ctp_df.withColumnRenamed(column, "COUNTERPART_" + column)
for key_p, key_c in zip(pivot_params.keys(), ctp_params.keys()):
if isinstance(pivot_params[key_p], str):
pivot_params[key_p] = "PIVOT_" + pivot_params[key_p]
ctp_params[key_c] = "COUNTERPART_" + ctp_params[key_c]
else:
pivot_params[key_p] = ["PIVOT_" + column for column in pivot_params[key_p]]
ctp_params[key_c] = ["COUNTERPART_" + column for column in ctp_params[key_c]]
pivot_cols = pivot_params["columns-transaction"].copy()
if pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(pivot_params["amount-column"])
ctp_cols = ctp_params["columns-transaction"].copy()
if ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(ctp_params["amount-column"])
# Ejecutamos lógica de excluir registros
if len(pivot_params["columns-group"]) == 0 and len(ctp_params["columns-group"]) == 0:
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
# Caso: 1 - Muchos
elif len(pivot_params["columns-group"]) == 0 and len(ctp_params["columns-group"]) > 0:
ctp_df2 = ctp_df.groupby(ctp_params["columns-group"]). \
agg(round(sum(ctp_params["amount-column"]), ROUND_DECIMAL).alias(ctp_params["amount-column"]),
collect_list(ctp_params["id-column"]).alias(ctp_params["id-column"]))
pivot_df2 = pivot_df
# Caso: Muchos - 1
elif len(pivot_params["columns-group"]) > 0 and len(ctp_params["columns-group"]) == 0:
pivot_df2 = pivot_df.groupby(pivot_params["columns-group"]).agg(
round(sum(pivot_params["amount-column"]), ROUND_DECIMAL).alias(pivot_params["amount-column"]),
collect_list(pivot_params["id-column"]).alias(pivot_params["id-column"]))
ctp_df2 = ctp_df
# Caso: Muchos - Muchos
elif len(pivot_params["columns-group"]) > 0 and len(ctp_params["columns-group"]) > 0:
pivot_df2 = pivot_df.groupby(pivot_params["columns-group"]).agg(
round(sum(pivot_params["amount-column"]), ROUND_DECIMAL).alias(pivot_params["amount-column"]),
collect_list(pivot_params["id-column"]).alias(pivot_params["id-column"]))
ctp_df2 = ctp_df.groupby(ctp_params["columns-group"]).agg(
round(sum(ctp_params["amount-column"]), ROUND_DECIMAL).alias(ctp_params["amount-column"]),
collect_list(ctp_params["id-column"]).alias(ctp_params["id-column"]))
condition = [pivot_df2[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_params["columns-transaction"],
ctp_params["columns-transaction"])]
total_merged = pivot_df2.join(ctp_df2, condition, 'left')
total_merged = total_merged.withColumn("DIFF", when(col(ctp_params["columns-transaction"][0]).isNotNull(),
lit(0)).otherwise(lit(None)))
total_merged = total_merged.select(*pivot_df2.columns, "DIFF")
condition = [total_merged[col1] == ctp_df2[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = total_merged.join(ctp_df2, condition)
merged = merged.withColumn("DIFF", when(col("DIFF").isNull(),
total_merged[pivot_params["amount-column"]] - ctp_df2[ctp_params["amount-column"]]).otherwise(col("DIFF")))
if len(pivot_params["columns-group"]) == 0 and len(ctp_params["columns-group"]) > 0:
merged = merged.sort(pivot_params["id-column"])
merged = merged.dropDuplicates([pivot_cols])
elif len(pivot_params["columns-group"]) > 0 and len(ctp_params["columns-group"]) == 0:
merged = merged.sort(ctp_params["id-column"])
merged = merged.dropDuplicates([ctp_cols])
merged_df = merged.withColumn("DIFF", round(merged["DIFF"], ROUND_DECIMAL))
if exclude_pivot:
df = pivot_df
group_cols = pivot_params["columns-group"]
amount_col = pivot_params["amount-column"]
id_col = pivot_params["id-column"]
else:
df = ctp_df
group_cols = ctp_params["columns-group"]
amount_col = ctp_params["amount-column"]
id_col = ctp_params["id-column"]
total_tmp_cols = group_cols + ["DIFF"]
df3 = df.join(merged_df.select(*total_tmp_cols), group_cols)
columns = [col(column) for column in group_cols]
custom = udf(custom_func_udf, ArrayType(IntegerType()))
# Fitlrar solo los que tienen S en el campo de exclusión - No tomaria los matches
# df3 = df3.filter(col(EXCLUDE_ROWS_FIELD) == 'S')
resultado = df3.groupby(*columns).agg(
custom(collect_list(amount_col), collect_list(id_col), collect_list(EXCLUDE_ROWS_FIELD), collect_list("DIFF"), lit(max_combinations)).alias("LISTA_DIFF"))
meged2 = resultado.join(merged_df, group_cols, 'left')
handle_array_udf = udf(handle_array, ArrayType(IntegerType()))
meged2 = meged2.withColumn("LISTA_DIFF", handle_array_udf("LISTA_DIFF"))
meged2 = meged2.filter((col("DIFF") == 0) | ((col("DIFF") != 0) & (size(col("LISTA_DIFF")) > 0)))
if exclude_pivot:
meged2 = meged2.withColumn("INTER_PIVOT_ID", array_except(meged2[pivot_params["id-column"]], meged2["LISTA_DIFF"]))
meged2 = meged2.withColumnRenamed(ctp_params["id-column"], "INTER_CTP_ID")
if meged2.schema["INTER_CTP_ID"].dataType == LongType():
meged2 = meged2.withColumn("INTER_CTP_ID", array(col("INTER_CTP_ID")).cast(ArrayType(LongType())))
else:
meged2 = meged2.withColumn("INTER_CTP_ID", array_except(meged2[ctp_params["id-column"]], meged2["LISTA_DIFF"]))
meged2 = meged2.withColumnRenamed(pivot_params["id-column"], "INTER_PIVOT_ID")
if meged2.schema["INTER_PIVOT_ID"].dataType == LongType():
meged2 = meged2.withColumn("INTER_PIVOT_ID", array(col("INTER_PIVOT_ID")).cast(ArrayType(LongType())))
meged2.show()
print("SHOW:", time.time() - start_time)
meged2 = meged2.toPandas()
print("SOLO ALGORITMO:", time.time() - start_time)
# Guardado en la BD
print("creando result")
result = create_result(meged2, descriptor)
print("emepce a guardar")
if result["status"] == StatusEnum.ERROR:
raise InterruptedError(f"Error generando el json resultado. {result['message']}")
save = save_result(result, session, jdbc_url, descriptor, timezone, pattern)
if save["status"] == StatusEnum.ERROR:
raise InterruptedError(f"Error guardando registro resultado en la BD. {result['message']}")
response["status"] = StatusEnum.OK.name
return response
def createSession(name: str = "app_engine_spark"):
try:
session = SparkSession.builder \
.appName(name) \
.getOrCreate()
return session
except Exception as e:
raise Exception(f"Error creando sesion Spark. {e}")
def handle_array(x):
if isinstance(x, List):
return x
else:
return []
def custom_func_udf(amount_values, id_values, excludes, diffs, max_combinations):
diff = diffs[0]
if pd.isna(diff) or diff == 0:
return None
diff = int(diff * (10**ROUND_DECIMAL))
amount_values = [int(value * (10**ROUND_DECIMAL)) for value, exclude in zip(amount_values, excludes) if exclude == 'S']
dir_name = str(uuid.uuid4())
prefix = "/tmp/" + dir_name + "_"
tmp_file_arr1, tmp_file_arr2 = "values.txt", "target.txt"
full_path_arr1, full_path_arr2 = prefix + tmp_file_arr1, prefix + tmp_file_arr2
with open(full_path_arr1, 'w') as archivo:
archivo.writelines([f'{entero}\n' for entero in amount_values])
with open(full_path_arr2, 'w') as archivo:
archivo.write(str(diff))
executable_path = '/tmp/subset_sum_linux'
indices = []
for comb in range(1, max_combinations+1):
argumentos = [full_path_arr1, full_path_arr2, str(comb), '1', '1', 'false', 'false']
result = subprocess.run([executable_path] + argumentos, check=True, capture_output=True, text=True)
result = str(result)
if "keys:[" in result:
match = result[result.index("keys:[") + 5:result.index("keys remainder") - 20]
match = match.replace("targets:", "").replace("+", ",")
match = match.split("==")[0].replace(" ", "")
match = json.loads(match)
for idx, val in zip(id_values, amount_values):
if val in match:
indices.append(idx)
match.remove(val)
break
os.remove(full_path_arr1), os.remove(full_path_arr2)
return indices
def create_result(data, descriptor):
result = []
response = {"detail": result}
try:
exclude_pivot = descriptor["config-params"]["exclude-entity-pivot"]
pivot_params = descriptor["params-input"]["pivot-config"]
ctp_params = descriptor["params-input"]["counterpart-config"]
group_pivot_match = pivot_params["columns-group"]
transaction_pivot_match = pivot_params["columns-transaction"]
group_counterpart_match = ctp_params["columns-group"]
transaction_counterpart_match = ctp_params["columns-transaction"]
used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match
if data is None or data.empty:
logger.info(f"El dataframe resultado esta vacio")
else:
for idx, i in data.iterrows():
input_data = {}
key_transaction = None
key_group_pivot = None
key_group_counterpart = None
for element in used_list:
if key_transaction is None:
key_transaction = str(i[element])
else:
key_transaction = key_transaction + "-" + str(i[element])
for element_g in group_pivot_match:
if key_group_pivot is None:
key_group_pivot = str(i[element_g])
else:
key_group_pivot = key_group_pivot + "-" + str(i[element_g])
for element_c in group_counterpart_match:
if key_group_counterpart is None:
key_group_counterpart = str(i[element_c])
else:
key_group_counterpart = key_group_counterpart + "-" + str(i[element_c])
input_data["key-transaction"] = str(key_transaction)
input_data["key-group-pivot"] = "" if key_group_pivot is None else str(key_group_pivot)
input_data["key-group-counterpart"] = "" if key_group_counterpart is None else str(
key_group_counterpart)
input_data["list-ids-pivot"] = str(i[FixedFieldsEnum.INTER_PIVOT_ID.value])
input_data["list-ids-counterpart"] = str(i[FixedFieldsEnum.INTER_CTP_ID.value])
input_data["exclude-ids"] = str(i[FixedFieldsEnum.LISTA_DIFF.value])
input_data["difference-amount"] = str(i[FixedFieldsEnum.DIFF.value])
result.append(input_data)
response['status'] = StatusEnum.OK
response["detail"] = result
except Exception as e:
logger.error(f"Error al crear el diccionario de resultados. {e}")
response["status"] = StatusEnum.ERROR
response["message"] = str(e)
finally:
return response
def save_result(result, session, jdbc_url, descriptor, timezone, pattern):
response = {}
try:
d1 = datetime_by_tzone(timezone, pattern)
result_json = json.dumps(result["detail"])
data = [descriptor["idScript"], descriptor["idProcess"], d1, FixedFieldsEnum.MATCH_RECORDS.value, result_json]
df = pd.DataFrame([data], columns=RESULT_TABLE_FIELDS)
df = session.createDataFrame(df)
df.write.format("jdbc").option("url", jdbc_url).option("dbtable", RESULT_TABLENAME). \
option("driver", "com.mysql.cj.jdbc.Driver").mode("append").save()
response['status'] = StatusEnum.OK
except Exception as e:
response["status"] = StatusEnum.ERROR
response["message"] = str(e)
logger.error(f"Error al guardar registro en la base de datos {e}")
finally:
return response
def datetime_by_tzone(timezone, pattern):
tzone = timezone
offset = None
# Algunos casos donde el timezone es de la forma 4:30 y no se encuentra en timezones de pytz (GMT)
if ":" in tzone:
offset = tzone.split(":")[1]
tzone = tzone.split(":")[0]
if "+" in tzone:
tzone = tzone.replace(tzone[-1], str(int(tzone[-1]) + 1))
timezones_list = pytz.all_timezones
tzones = [x if tzone in x else None for x in timezones_list]
tzones = list(filter(None, tzones))
server_timezone = pytz.timezone(tzones[0])
logger.debug("Zona Horaria : {}".format(server_timezone))
server_time = server_timezone.localize(datetime.utcnow())
current_time = parse(server_time.strftime('%Y-%m-%d %H:%M:%S.%f %Z'))
if offset:
offset = pytz.FixedOffset((current_time.utcoffset().total_seconds() / 60 + float(offset)) * -1)
offset = offset.utcoffset(datetime.utcnow())
current_time = datetime.utcnow() + offset
else:
current_time = current_time.replace(tzinfo=None) - current_time.utcoffset()
current_time = parse(current_time.strftime(pattern))
logger.debug("Hora actual: {}".format(current_time))
return current_time
# Ejecución de proceso
process()
from typing import Any, Dict, List
import numpy as np
import pandas as pd
import json import json
from dask import dataframe as dd from typing import Any, Dict
from numba import jit, types, typed import sys
from wrapt_timeout_decorator import timeout from wrapt_timeout_decorator import timeout
import multiprocessing as mp from app.main.engine.util.EMRServerless import EMRServerless
from app.main.engine.action.ActionInterface import ActionInterface from app.main.engine.action.ActionInterface import ActionInterface
...@@ -24,6 +21,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -24,6 +21,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def __init__(self, app) -> None: def __init__(self, app) -> None:
super().__init__(app) super().__init__(app)
self.descriptor = None
self.max_combinations = None self.max_combinations = None
self.timeout = None self.timeout = None
self.exclude_pivot = None self.exclude_pivot = None
...@@ -57,176 +55,65 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -57,176 +55,65 @@ class MatchAndExcludeRecordsAction(ActionInterface):
if param not in pivot_params.keys() or param not in ctp_params.keys(): if param not in pivot_params.keys() or param not in ctp_params.keys():
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte") raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.descriptor = descriptor
self.max_combinations = configs["max-records-per-combinations"] self.max_combinations = configs["max-records-per-combinations"]
self.timeout = configs["max-timeout-per-combinations"] self.timeout = configs["max-timeout-per-combinations"]
self.exclude_pivot = configs["exclude-entity-pivot"] self.exclude_pivot = configs["exclude-entity-pivot"]
self.pivot_params = pivot_params self.pivot_params = pivot_params
self.ctp_params = ctp_params self.ctp_params = ctp_params
def process(self, source_obs): def process(self, source_obs, script_name, timezone, pattern):
try: try:
@timeout(self.timeout) @timeout(self.timeout)
def __process(source_obj): def __process(source_obj):
# Traer la data desde BD tanto pivot como contraparte descriptor = DESCRIPTOR
pivot_table, ctp_table = self.pivot_params["tablename"], self.ctp_params["tablename"] serverless_job_role_arn = descriptor["job_role_arn"]
dialect = source_obj.get_dialect() s3_bucket_name = descriptor["bucket_base"]
search_emr = descriptor["emr_started"]
pivot_df = dd.read_sql_table(pivot_table, dialect, index_col=self.pivot_params["id-column"], end_app = descriptor["terminated_app"]
npartitions=mp.cpu_count()) emr_serverless = EMRServerless(search_app=search_emr)
pivot_df = pivot_df.reset_index()
ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"], self.app.logger.info("Validando si exite una aplicación ya en curso")
npartitions=mp.cpu_count()) exists_app = emr_serverless.valid_application()
ctp_df = ctp_df.reset_index() if not exists_app["exists"]:
self.app.logger.info("Creando e inicializando la aplicación EMR Serverless")
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input emr_serverless.create_application(descriptor["application_name"], descriptor["emr_version"],
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_' descriptor["app_args"])
# Iterar sobre las columnas del DataFrame
for column in pivot_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
new_column_name = "PIVOT_" + column
pivot_df = pivot_df.rename(columns={column: new_column_name})
for column in ctp_df.columns:
if column == EXCLUDE_ROWS_FIELD:
continue
new_column_name = "COUNTERPART_" + column
ctp_df = ctp_df.rename(columns={column: new_column_name})
for key_p, key_c in zip(self.pivot_params.keys(), self.ctp_params.keys()):
if isinstance(self.pivot_params[key_p], str):
self.pivot_params[key_p] = "PIVOT_"+self.pivot_params[key_p]
self.ctp_params[key_c] = "COUNTERPART_"+self.ctp_params[key_c]
else:
self.pivot_params[key_p] = ["PIVOT_"+column for column in self.pivot_params[key_p]]
self.ctp_params[key_c] = ["COUNTERPART_" + column for column in self.ctp_params[key_c]]
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"])
ctp_cols = self.ctp_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
max_combinations = self.max_combinations
# Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
# Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({
self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades
self.ctp_params["id-column"]: list
})
ctp_df2 = ctp_df2.reset_index()
pivot_df2 = pivot_df
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
self.pivot_params["amount-column"]: 'sum',
self.pivot_params["id-column"]: list
})
pivot_df2 = pivot_df2.reset_index()
ctp_df2 = ctp_df
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
pivot_df2 = pivot_df.groupby(self.pivot_params["columns-group"]).agg({
self.pivot_params["amount-column"]: 'sum',
self.pivot_params["id-column"]: list
})
pivot_df2 = pivot_df2.reset_index()
ctp_df2 = ctp_df.groupby(self.ctp_params["columns-group"]).agg({
self.ctp_params["amount-column"]: 'sum', # Sumar la columna de cantidades
self.ctp_params["id-column"]: list
})
ctp_df2 = ctp_df2.reset_index()
pivot_df2[self.pivot_params["amount-column"]] = pivot_df2[self.pivot_params["amount-column"]].round(
ROUND_DECIMAL)
ctp_df2[self.ctp_params["amount-column"]] = ctp_df2[self.ctp_params["amount-column"]].round(
ROUND_DECIMAL)
total_merged = pivot_df2.merge(ctp_df2, 'left', left_on=self.pivot_params["columns-transaction"],
right_on=self.ctp_params["columns-transaction"])
total_merged = total_merged.map_partitions(self.add_diff_column)
selected_columns = list(pivot_df2.columns) + ['DIFF']
total_merged = total_merged[selected_columns]
merged = total_merged.merge(ctp_df2, 'inner', left_on=pivot_cols, right_on=ctp_cols)
merged['DIFF'] = merged['DIFF'].where(merged['DIFF'].notnull(),
merged[self.pivot_params["amount-column"]] - merged[
self.ctp_params["amount-column"]])
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
merged = merged.set_index(self.pivot_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.pivot_params["id-column"]]))
merged = merged.drop_duplicates(subset=pivot_cols)
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
merged = merged.set_index(self.ctp_params["id-column"])
merged = merged.map_partitions(lambda df_: df_.sort_values([self.ctp_params["id-column"]]))
merged = merged.drop_duplicates(subset=ctp_cols)
merged = merged.reset_index()
merged_df = merged.assign(DIFF=lambda partition: partition["DIFF"].round(ROUND_DECIMAL))
if self.exclude_pivot:
df = pivot_df
group_cols = self.pivot_params["columns-group"]
amount_col = self.pivot_params["amount-column"]
id_col = self.pivot_params["id-column"]
else:
df = ctp_df
group_cols = self.ctp_params["columns-group"]
amount_col = self.ctp_params["amount-column"]
id_col = self.ctp_params["id-column"]
total_tmp_cols = group_cols + ["DIFF"]
df3 = df.merge(merged_df[total_tmp_cols], 'inner', on=group_cols)
df3 = df3.compute()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
resultado = df3.groupby(group_cols)[total_cols].apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
resultado = resultado.reset_index()
if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else: else:
resultado.columns = group_cols + ["LISTA_DIFF"] emr_serverless.application_id = exists_app["app"]
resultado = dd.from_pandas(resultado, npartitions=mp.cpu_count()) emr_serverless.start_application()
self.app.logger.info(emr_serverless)
meged2 = resultado.merge(merged_df, 'left', group_cols) job = descriptor["job"]
script_location = job["script_location"] + "emr_" + script_name
meged2 = meged2.map_partitions(lambda partition: partition.assign( jdbc_conn = source_obj.create_spark_connection()
LISTA_DIFF=partition['LISTA_DIFF'].apply(lambda x: [] if pd.isna(x) else x)), meta=meged2.dtypes.to_dict()) # jdbc_url = jdbc_conn["url"]
jdbc_url = "jdbc:mysql://admin:awsadmin@database-2.cgcfmoce13qq.us-east-1.rds.amazonaws.com:3306/cusca"
meged2 = meged2[ # jdbc_properties = jdbc_conn["properties"]
(meged2['DIFF'] == 0) | arguments = [json.dumps(self.descriptor), jdbc_url, timezone, pattern]
((meged2['DIFF'] != 0) & meged2['LISTA_DIFF'].apply( self.app.logger.info("Lanzando nuevo job Spark")
lambda x: True if not pd.isna(x) and ((isinstance(x, List) and len(x) > 0) or (isinstance(x, str) and len(x) > 2)) else False)) self.app.logger.info(script_location)
] self.app.logger.info(serverless_job_role_arn)
meged2 = meged2.compute() self.app.logger.info(arguments)
self.app.logger.info(job["sparkArgs"])
if meged2.empty: self.app.logger.info(s3_bucket_name)
pass job_run_id = emr_serverless.run_spark_job(
elif self.exclude_pivot: script_location=script_location,
meged2['INTER_PIVOT_ID'] = meged2.apply(lambda row: self.array_except(row[self.pivot_params["id-column"]], row['LISTA_DIFF']), axis=1) job_role_arn=serverless_job_role_arn,
meged2 = meged2.rename(columns={self.ctp_params["id-column"]: "INTER_CTP_ID"}) arguments=arguments,
if meged2['INTER_CTP_ID'].dtype == 'int64': sparkArguments=job["sparkArgs"],
meged2['INTER_CTP_ID'] = meged2['INTER_CTP_ID'].apply(lambda x: [x]).astype('object') s3_bucket_name=s3_bucket_name,
else: )
meged2['INTER_CTP_ID'] = meged2.apply(lambda row: self.array_except(row[self.ctp_params["id-column"]], row['LISTA_DIFF']), axis=1) job_status = emr_serverless.get_job_run(job_run_id)
meged2 = meged2.rename(columns={self.pivot_params["id-column"]: "INTER_PIVOT_ID"}) self.app.logger.info(f"Job terminado: {job_run_id}, Estado: {job_status.get('state')}")
if meged2['INTER_PIVOT_ID'].dtype == 'int64': # Fetch and print the logs
meged2['INTER_PIVOT_ID'] = meged2['INTER_PIVOT_ID'].apply(lambda x: [x]).astype('object') spark_driver_logs = emr_serverless.fetch_driver_log(s3_bucket_name, job_run_id)
self.app.logger.info("Archivo de salida:\n----\n", spark_driver_logs, "\n----")
return meged2 if end_app:
# Now stop and delete your application
self.app.logger.info("Deteniendo y borrando aplicación Spark")
emr_serverless.stop_application()
emr_serverless.delete_application()
self.app.logger.info("Hecho! 👋")
except TimeoutError as e: except TimeoutError as e:
raise TimeoutError(f"Tiempo límite superado. {e}") raise TimeoutError(f"Tiempo límite superado. {e}")
...@@ -236,80 +123,57 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -236,80 +123,57 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def response(self): def response(self):
return self.output return self.output
def add_diff_column(self, partition):
partition['DIFF'] = np.where(partition[self.ctp_params["columns-transaction"][0]].notnull(), 0, np.nan)
return partition
def handle_array(self, x):
if isinstance(x, np.ndarray):
return x
else:
return []
def array_except(self, arr1, arr2):
if arr2 is None:
return arr1
elif not isinstance(arr2, List):
cadena_sin_corchetes = arr2.strip('[]')
partes = cadena_sin_corchetes.split()
arr2 = [int(numero) for numero in partes]
arr1 = json.loads(arr1.replace(" ", ""))
return [item for item in arr1 if item not in arr2]
def custom_func(group, amount_field, id_field, max_combinations):
diff_value = group["DIFF"].values[0]
if np.isnan(diff_value):
return None
diff = int(diff_value*(10**ROUND_DECIMAL))
if pd.isna(diff) or diff == 0:
return None
group = group[group[EXCLUDE_ROWS_FIELD] == 'S']
group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True)
values = group[amount_field].values
values *= (10**ROUND_DECIMAL)
values = values.astype(np.int64)
ids = group[id_field].values
tam = len(values)
tam = tam if tam <= max_combinations else max_combinations
result = subset_sum_iter(values, diff, tam)
indices = ids[np.isin(values, result)]
return indices
@jit(nopython=False)
def subset_sum_iter(numbers, target, num_elements):
# Initialize solutions list
final = typed.List.empty_list(types.int64)
for step in range(1, num_elements+1):
# Build first index by taking the first num_elements from the numbers
indices = list(range(step))
while True:
for i in range(step):
if indices[i] != i + len(numbers) - step:
break
else:
# No combinations left
break
# Increase current index and all its following ones
indices[i] += 1
for j in range(i + 1, step):
indices[j] = indices[j - 1] + 1
# Check current solution
solution = typed.List.empty_list(types.int64)
for i in indices:
solution.append(numbers[i])
if round(sum(solution), ROUND_DECIMAL) == target:
final = solution
break
if len(final) > 0:
break
return final
DESCRIPTOR = {
"application_name": "css_cuscatlan",
"emr_version": "emr-7.0.0",
"emr_started": True,
"terminated_app": False,
"job_role_arn": "arn:aws:iam::000026703603:role/emr-serverless-job-role",
"bucket_base": "bucket-emr-example",
"app_args": {
"initial_capacity": {
"DRIVER": {
"workerCount": 1,
"workerConfiguration": {
"cpu": "16vCPU",
"memory": "32GB"
}
},
"EXECUTOR": {
"workerCount": 12,
"workerConfiguration": {
"cpu": "16vCPU",
"memory": "32GB"
}
}
},
"maximun_capacity": {
"cpu": "208vCPU",
"memory": "416GB",
"disk": "1000GB"
},
"imageConfiguration": {
"imageUri": "000026703603.dkr.ecr.us-east-1.amazonaws.com/css_spark_custom:0.0.5"
},
"networkConfiguration": {
"subnetIds": [
"subnet-0f86499848ec99861", "subnet-078fe716da8b53818", "subnet-0a7d0a8bc3b623474"
],
"securityGroupIds": [
"sg-02154713a3639f7ce"
]
}
},
"job": {
"script_location": "s3://bucket-emr-example/css_cusca/endpoint/",
"sparkArgs": {
"driver-cores": 16,
"driver-memory": "14g",
"executor-cores": 16,
"executor-memory": "14g",
"executor-instances": 12,
"others": "--jars s3://bucket-emr-example/bcom_spark/jars/mysql-connector-java-8.0.30.jar"
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment