Commit 6af4da44 authored by Cristian Aguirre's avatar Cristian Aguirre

Update match-and-exclude-records-actions_v1.py

parent 6977026b
......@@ -7,7 +7,7 @@ app:
port: 13306
user: root
password: root
db: css_engine
db: css_cuscatlan
dialect: 'mysql+pymysql'
# BD conexion configurations
# https://docs.sqlalchemy.org/en/14/core/pooling.html
......@@ -36,9 +36,3 @@ app:
loglevel: 'debug'
accesslog: '-'
capture_output: True
spark:
cores: '*'
memory: 16 # En gb y que sea par, de preferencia 2, 4, 6, 18, 32, etc.
jars_path:
mysql: 'jars/mysql-connector-java-8.0.30.jar'
\ No newline at end of file
......@@ -25,11 +25,6 @@ class Config(object):
# Max threads allowed
max_engine_threads = conf["max_engine_threads"]
# Spark config
spark_cores = conf["spark"]["cores"]
spark_mem = conf["spark"]["memory"]
spark_jars = conf["spark"]["jars_path"]
class ProductionConfig(Config):
DEBUG = False
......
......@@ -69,6 +69,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.ctp_params = ctp_params
def process(self, source_obj):
# Inicializar la sesion de Spark
session = self.createSession()
......@@ -77,8 +78,52 @@ class MatchAndExcludeRecordsAction(ActionInterface):
jdbc_conn = source_obj.create_spark_connection()
jdbc_url = jdbc_conn["url"]
jdbc_properties = jdbc_conn["properties"]
print(jdbc_url, jdbc_properties)
pivot_df = session.read.jdbc(url=jdbc_url, table=pivot_table, properties=jdbc_properties)
ctp_df = session.read.jdbc(url=jdbc_url, table=ctp_table, properties=jdbc_properties)
from pyspark.sql.functions import col, sum, collect_list, round
# Ejecutamos lógica de excluir registros
if len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) == 0:
raise RuntimeError(f"Debe haber al menos pivot o contraparte agrupado")
# Caso: 1 - Muchos
elif len(self.pivot_params["columns-group"]) == 0 and len(self.ctp_params["columns-group"]) > 0:
ctp_df = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), 2).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
ctp_df.show()
# Caso: Muchos - 1
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) == 0:
pivot_df = pivot_df.groupby(self.pivot_params["columns-group"]).\
agg(round(sum(self.pivot_params["amount-column"]), 2).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
pivot_df.show()
# Caso: Muchos - Muchos
elif len(self.pivot_params["columns-group"]) > 0 and len(self.ctp_params["columns-group"]) > 0:
pivot_df = pivot_df.groupby(self.pivot_params["columns-group"]). \
agg(round(sum(self.pivot_params["amount-column"]), 2).alias(self.pivot_params["amount-column"]),
collect_list(self.pivot_params["id-column"]).alias(self.pivot_params["id-column"]))
ctp_df = ctp_df.groupby(self.ctp_params["columns-group"]). \
agg(round(sum(self.ctp_params["amount-column"]), 2).alias(self.ctp_params["amount-column"]),
collect_list(self.ctp_params["id-column"]).alias(self.ctp_params["id-column"]))
pivot_cols = self.pivot_params["columns-transaction"].copy()
if self.pivot_params["amount-column"] in pivot_cols:
pivot_cols.remove(self.pivot_params["amount-column"])
ctp_cols = self.ctp_params["columns-transaction"].copy()
if self.ctp_params["amount-column"] in ctp_cols:
ctp_cols.remove(self.ctp_params["amount-column"])
pivot_df.show()
ctp_df.show()
condition = [pivot_df[col1] == ctp_df[col2] for col1, col2 in zip(pivot_cols, ctp_cols)]
merged = pivot_df.join(ctp_df, condition)
merged = merged.withColumn("DIFF", pivot_df[self.pivot_params["amount-column"]] - ctp_df[self.ctp_params["amount-column"]])
merged.show()
def response(self):
print("Llegue al response3")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment