Commit 805f468e authored by Cristian Aguirre's avatar Cristian Aguirre

fix bug commission

parent a78f4f94
from enum import Enum
class DatabaseTypeEnum(Enum):
MONGODB = "mongodb"
MYSQL = "mysql"
REDSHIFT = "redshift"
STARROKS = "starroks"
...@@ -3,12 +3,11 @@ from typing import Dict, Any, List ...@@ -3,12 +3,11 @@ from typing import Dict, Any, List
from prefect import task from prefect import task
import time import time
from pyspark.sql import DataFrame from pyspark.sql import DataFrame
from pyspark.sql.functions import when, lit, sum as py_sum, count, coalesce from pyspark.sql.functions import when, lit, sum as py_sum, count, coalesce, expr
from graphframes import GraphFrame from graphframes import GraphFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from Utils.SparkUtils import createSession, find_related_vertices from Utils.SparkUtils import createSession, find_related_vertices
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
logger = logging.getLogger() logger = logging.getLogger()
...@@ -25,17 +24,42 @@ class CommissionProcess: ...@@ -25,17 +24,42 @@ class CommissionProcess:
self.inter_results = {} self.inter_results = {}
def init(self, spark_jars: Dict[str, str]) -> None: def init(self, spark_jars: Dict[str, str]) -> None:
self.session = createSession(self.identifier, spark_jars) self.session = createSession(self.identifier, spark_jars, "")
@task @task
def get_inputs(self, starroks_jdbc: str, starroks_fe: str) -> None: def get_inputs(self, db_type: DatabaseTypeEnum, starroks_jdbc: str, starroks_fe: str, redshift_url: str = "",
mysql_url: str = "") -> None:
try: try:
inputs = self.conf["inputs"] inputs = self.conf["inputs"]
database = starroks_jdbc[starroks_jdbc.rfind("/") + 1:] database = starroks_jdbc[starroks_jdbc.rfind("/") + 1:]
starroks_user = self.conf["starroks"]["user"]
starroks_pass = self.conf["starroks"]["password"]
for input_obj in inputs["data"]: for input_obj in inputs["data"]:
identifier = input_obj["identifier"] identifier = input_obj["identifier"]
if db_type == DatabaseTypeEnum.REDSHIFT:
redshift_user = self.conf["redshift"]["user"]
redshift_pass = self.conf["redshift"]["password"]
df_input = self.session.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", redshift_url) \
.option("dbtable", identifier) \
.option("user", redshift_user) \
.option("password", redshift_pass) \
.load()
elif db_type == DatabaseTypeEnum.MYSQL:
mysql_user = self.conf["mysql"]["user"]
mysql_pass = self.conf["mysql"]["password"]
df_input = self.session.read \
.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", mysql_url) \
.option("dbtable", identifier) \
.option("user", mysql_user) \
.option("password", mysql_pass) \
.load()
else:
starroks_user = self.conf["starroks"]["user"]
starroks_pass = self.conf["starroks"]["password"]
df_input = self.session.read.format("starrocks"). \ df_input = self.session.read.format("starrocks"). \
option("starrocks.table.identifier", database+"."+identifier). \ option("starrocks.table.identifier", database+"."+identifier). \
option("starrocks.fe.http.url", starroks_fe). \ option("starrocks.fe.http.url", starroks_fe). \
...@@ -247,45 +271,24 @@ class CommissionProcess: ...@@ -247,45 +271,24 @@ class CommissionProcess:
try: try:
start_process = time.time() start_process = time.time()
graph_related = find_related_vertices(graph) graph_related = find_related_vertices(graph)
graph_related = {k: v for k, v in graph_related.items() if len(v) != 0}
merged = df_goals.join(df_executes, "AGENTE_COMISIONA", 'left') merged = df_goals.join(df_executes, "AGENTE_COMISIONA", 'left')
merged = merged.join(df_base, "AGENTE_COMISIONA", 'left') merged = merged.join(df_base, "AGENTE_COMISIONA", 'left')
# for vertex in graph_related.keys():
# value = merged.filter(merged["AGENTE_COMISIONA"].isin(graph_related[vertex])).agg(py_sum("EJECUTADO_1"), graph_related_df = self.session.createDataFrame([(k, v) for k, v in graph_related.items()], ["vertex", "related_agents"])
# py_sum("EJECUTADO_2"),
# py_sum("EJECUTADO_3")).collect()[0]
# value_1, value_2, value_3 = value[0], value[1], value[2]
# merged = merged.withColumn("EJECUTADO_1", when(merged["AGENTE_COMISIONA"] == vertex, value_1).otherwise(merged["EJECUTADO_1"]))
# merged = merged.withColumn("EJECUTADO_2", when(merged["AGENTE_COMISIONA"] == vertex, value_2).otherwise(merged["EJECUTADO_2"]))
# merged = merged.withColumn("EJECUTADO_3", when(merged["AGENTE_COMISIONA"] == vertex, value_3).otherwise(merged["EJECUTADO_3"]))
graph_related_df = self.session.createDataFrame([(k, v) for k, v in graph_related.items()],
["vertex", "related_agents"])
# Agregar los valores ejecutados por cada agente # Agregar los valores ejecutados por cada agente
agg_df = merged.join(graph_related_df, F.expr("array_contains(related_agents, AGENTE_COMISIONA)"), "inner") \ agg_df = merged.join(graph_related_df, expr("array_contains(related_agents, AGENTE_COMISIONA)"), "inner") \
.groupBy("vertex") \ .groupBy("vertex") \
.agg(F.sum("EJECUTADO_1").alias("sum_EJECUTADO_1"), .agg(py_sum("EJECUTADO_1").alias("sum_EJECUTADO_1"),
F.sum("EJECUTADO_2").alias("sum_EJECUTADO_2"), py_sum("EJECUTADO_2").alias("sum_EJECUTADO_2"),
F.sum("EJECUTADO_3").alias("sum_EJECUTADO_3")) py_sum("EJECUTADO_3").alias("sum_EJECUTADO_3"))
# Crear ventanas para particionar por "AGENTE_COMISIONA" y ordenar por alguna columna (aquí suponemos "order_column") merged = merged.join(agg_df, merged["AGENTE_COMISIONA"] == agg_df["vertex"], 'left')
windowSpec1 = Window.partitionBy("AGENTE_COMISIONA").orderBy("AGENTE_COMISIONA") merged = merged.withColumn("EJECUTADO_1", when(merged["vertex"].isNotNull(), merged["sum_EJECUTADO_1"]).otherwise(merged["EJECUTADO_1"]))
merged = merged.withColumn("EJECUTADO_2", when(merged["vertex"].isNotNull(), merged["sum_EJECUTADO_2"]).otherwise(merged["EJECUTADO_2"]))
# Utilizar la función lag() para obtener el valor ejecutado anteriormente por cada agente merged = merged.withColumn("EJECUTADO_3", when(merged["vertex"].isNotNull(), merged["sum_EJECUTADO_3"]).otherwise(merged["EJECUTADO_3"]))
for i in range(1, 4): merged = merged.drop("vertex", "sum_EJECUTADO_1", "sum_EJECUTADO_2", "sum_EJECUTADO_3")
merged = merged.withColumn(f"EJECUTADO_{i}",
F.coalesce(F.lag(f"EJECUTADO_{i}", 1).over(windowSpec1), F.lit(0)))
# Realizar una join con agg_df para actualizar los valores ejecutados
for i in range(1, 4):
merged = merged.join(agg_df, merged["AGENTE_COMISIONA"] == agg_df["vertex"], "left") \
.withColumn(f"EJECUTADO_{i}", F.when(F.col("vertex").isNull(), merged[f"EJECUTADO_{i}"])
.otherwise(F.col(f"sum_EJECUTADO_{i}"))) \
.drop("vertex", "related_agents", f"sum_EJECUTADO_{i}")
merged.show()
print(f"Duración de creación de dataframes con grafos (jerarquía): {time.time() - start_process}") print(f"Duración de creación de dataframes con grafos (jerarquía): {time.time() - start_process}")
except Exception as e: except Exception as e:
logger.error(f"Error actualizando ejecutados por jerarquía. {e}") logger.error(f"Error actualizando ejecutados por jerarquía. {e}")
...@@ -405,8 +408,30 @@ class CommissionProcess: ...@@ -405,8 +408,30 @@ class CommissionProcess:
return merged return merged
@task @task
def write_result(self, df: DataFrame, collection_name: str, starroks_jdbc: str, starroks_fe: str) -> None: def write_result(self, df: DataFrame, table_name: str, db_type: DatabaseTypeEnum, starroks_jdbc: str, starroks_fe: str,
redshift_url: str = "", mysql_url: str = "") -> None:
try: try:
if db_type == DatabaseTypeEnum.REDSHIFT:
df.write \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", redshift_url) \
.option("dbtable", table_name) \
.option("user", "awsuser") \
.option("password", "Awsuser123") \
.mode("append") \
.save()
elif db_type == DatabaseTypeEnum.MYSQL:
df.write \
.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", mysql_url) \
.option("dbtable", table_name) \
.option("user", "root") \
.option("password", "root") \
.mode("overwrite") \
.save()
else:
database = starroks_jdbc[starroks_jdbc.rfind("/") + 1:] database = starroks_jdbc[starroks_jdbc.rfind("/") + 1:]
starroks_user = self.conf["starroks"]["user"] starroks_user = self.conf["starroks"]["user"]
starroks_pass = self.conf["starroks"]["password"] starroks_pass = self.conf["starroks"]["password"]
...@@ -414,7 +439,7 @@ class CommissionProcess: ...@@ -414,7 +439,7 @@ class CommissionProcess:
df.write.format("starrocks") \ df.write.format("starrocks") \
.option("starrocks.fe.http.url", starroks_fe) \ .option("starrocks.fe.http.url", starroks_fe) \
.option("starrocks.fe.jdbc.url", starroks_jdbc) \ .option("starrocks.fe.jdbc.url", starroks_jdbc) \
.option("starrocks.table.identifier", database+"."+collection_name) \ .option("starrocks.table.identifier", database+"."+table_name) \
.option("starrocks.user", starroks_user) \ .option("starrocks.user", starroks_user) \
.option("starrocks.password", starroks_pass) \ .option("starrocks.password", starroks_pass) \
.mode("append") \ .mode("append") \
......
from typing import Dict, Any from typing import Dict, Any
import logging import logging
from pyspark.sql.functions import col, when, lit, to_date, date_format, date_add from pyspark.sql.functions import col, when, lit, to_date, date_format, date_add
from pyspark.sql.types import StructType, StructField, StringType
from prefect import task from prefect import task
from Enum.DataTypeEnum import DataTypeEnum from Enum.DataTypeEnum import DataTypeEnum
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
from Enum.InputTypeEnum import InputTypeEnum
from Utils.SparkUtils import createSession from Utils.SparkUtils import createSession
from Input.Source import Input from Input.Source import Input
...@@ -20,8 +21,8 @@ class ETLProcess: ...@@ -20,8 +21,8 @@ class ETLProcess:
self.inputs = {} self.inputs = {}
def init(self, spark_jars: Dict[str, str]) -> None: def init(self, spark_jars: Dict[str, str], source_type: InputTypeEnum = InputTypeEnum.BUCKET) -> None:
self.session = createSession(self.identifier, spark_jars) self.session = createSession(self.identifier, spark_jars, source_type)
@task @task
def reader(self) -> None: def reader(self) -> None:
...@@ -124,8 +125,30 @@ class ETLProcess: ...@@ -124,8 +125,30 @@ class ETLProcess:
return success return success
@task @task
def write(self, identifier: str, starroks_jdbc: str, starroks_fe: str, prev_status: bool = True) -> None: def write(self, identifier: str, starroks_jdbc: str, starroks_fe: str, prev_status: bool = True,
db_type: DatabaseTypeEnum = DatabaseTypeEnum.REDSHIFT, redshift_url: str = "", mysql_url: str = "") -> None:
try: try:
if db_type == DatabaseTypeEnum.REDSHIFT:
self.inputs[identifier].coalesce(45).write \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", redshift_url) \
.option("dbtable", identifier) \
.option("user", "awsuser") \
.option("password", "Awsuser123") \
.mode("append") \
.save()
elif db_type == DatabaseTypeEnum.MYSQL:
self.inputs[identifier].write \
.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", mysql_url) \
.option("dbtable", identifier) \
.option("user", "root") \
.option("password", "root") \
.mode("append") \
.save()
else:
database = starroks_jdbc[starroks_jdbc.rfind("/")+1:] database = starroks_jdbc[starroks_jdbc.rfind("/")+1:]
starroks_user = self.conf["starroks"]["user"] starroks_user = self.conf["starroks"]["user"]
starroks_pass = self.conf["starroks"]["password"] starroks_pass = self.conf["starroks"]["password"]
......
from typing import Dict from typing import Dict
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType
import logging import logging
from Enum.InputTypeEnum import InputTypeEnum
logger = logging.getLogger() logger = logging.getLogger()
def createSession(name: str, spark_jars: Dict[str, str]) -> SparkSession: def createSession(name: str, spark_jars: Dict[str, str], source_type: InputTypeEnum) -> SparkSession:
session = None session = None
try: try:
jars = list(spark_jars.values()) jars = list(spark_jars.values())
...@@ -18,12 +18,15 @@ def createSession(name: str, spark_jars: Dict[str, str]) -> SparkSession: ...@@ -18,12 +18,15 @@ def createSession(name: str, spark_jars: Dict[str, str]) -> SparkSession:
.appName(name) \ .appName(name) \
.config("spark.jars", jars) \ .config("spark.jars", jars) \
.config("spark.jars.packages", "graphframes:graphframes:0.8.3-spark3.4-s_2.12") \ .config("spark.jars.packages", "graphframes:graphframes:0.8.3-spark3.4-s_2.12") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
.config("spark.executor.extraClassPath", jars) \ .config("spark.executor.extraClassPath", jars) \
.config("spark.driver.extraClassPath", jars) \ .config("spark.driver.extraClassPath", jars) \
.config("spark.starrocks.driver", "com.starroks.jdbc.Driver") \ .config("spark.starrocks.driver", "com.starroks.jdbc.Driver") \
.config("spark.sql.catalogImplementation", "in-memory") \ .config("spark.sql.catalogImplementation", "in-memory") \
.getOrCreate() .getOrCreate()
session._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") session._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
if source_type == InputTypeEnum.LOCAL:
session._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://192.168.21.47:9000") session._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://192.168.21.47:9000")
session._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false") session._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
session._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") session._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
...@@ -70,5 +73,5 @@ def find_related_vertices(graph): ...@@ -70,5 +73,5 @@ def find_related_vertices(graph):
dfs(vertex_id, related_vertices) dfs(vertex_id, related_vertices)
# Agregar los vértices relacionados al diccionario # Agregar los vértices relacionados al diccionario
related_vertices_dict[vertex_id] = list(related_vertices) related_vertices_dict[vertex_id] = list(related_vertices)
related_vertices_dict[vertex_id].remove(vertex_id)
return related_vertices_dict return related_vertices_dict
...@@ -5,6 +5,7 @@ from typing import Any, Dict ...@@ -5,6 +5,7 @@ from typing import Any, Dict
from prefect import flow, get_run_logger from prefect import flow, get_run_logger
from Pipeline.CommissionProcess import CommissionProcess from Pipeline.CommissionProcess import CommissionProcess
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
SPARK_JARS = { SPARK_JARS = {
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar", "STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
...@@ -14,6 +15,10 @@ SPARK_JARS = { ...@@ -14,6 +15,10 @@ SPARK_JARS = {
STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark" STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030" STARROK_FE_NODE = "192.168.1.37:8030"
REDSHIFT_JDBC = "jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=prueba_ca"
DB_TYPE = DatabaseTypeEnum.REDSHIFT
@flow() @flow()
def run_commission(config: Dict[str, Any]) -> None: def run_commission(config: Dict[str, Any]) -> None:
...@@ -29,7 +34,7 @@ def run_commission(config: Dict[str, Any]) -> None: ...@@ -29,7 +34,7 @@ def run_commission(config: Dict[str, Any]) -> None:
# Primer task - Extraer la data - RECORDAR: SPARK ES LAZY!!! # Primer task - Extraer la data - RECORDAR: SPARK ES LAZY!!!
start_reader = time.time() start_reader = time.time()
commission_process.get_inputs(commission_process, STARROK_JDBC, STARROK_FE_NODE) commission_process.get_inputs(commission_process, DB_TYPE, STARROK_JDBC, STARROK_FE_NODE, REDSHIFT_JDBC)
logger.info(f"Duración de extracción de datos desde la BD: {time.time() - start_reader}") logger.info(f"Duración de extracción de datos desde la BD: {time.time() - start_reader}")
# Tercer task - Obtener metas # Tercer task - Obtener metas
...@@ -37,7 +42,7 @@ def run_commission(config: Dict[str, Any]) -> None: ...@@ -37,7 +42,7 @@ def run_commission(config: Dict[str, Any]) -> None:
goals = commission_process.get_goals(commission_process, "VENTAS", "GOALS") goals = commission_process.get_goals(commission_process, "VENTAS", "GOALS")
# Quinto task - Obtener ejecutados - ¿Aplicar tmb filtro de FLAG_COMISIONABLE y ACTIVE_USER_TRAFFIC? # Quinto task - Obtener ejecutados - ¿Aplicar tmb filtro de FLAG_COMISIONABLE y ACTIVE_USER_TRAFFIC?
executes = commission_process.get_executed(commission_process, "VENTAS", "TEAMS") executes = commission_process.get_executed(commission_process, "VENTAS", "DEVICES")
# Sexo task - Obtener monto origen # Sexo task - Obtener monto origen
base = commission_process.get_source_value(commission_process, "VENTAS", "COMERCIAL_BASE") base = commission_process.get_source_value(commission_process, "VENTAS", "COMERCIAL_BASE")
......
...@@ -5,6 +5,7 @@ from typing import Any, Dict ...@@ -5,6 +5,7 @@ from typing import Any, Dict
from prefect import flow, get_run_logger from prefect import flow, get_run_logger
from Pipeline.CommissionProcess import CommissionProcess from Pipeline.CommissionProcess import CommissionProcess
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
SPARK_JARS = { SPARK_JARS = {
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar", "STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
...@@ -14,6 +15,12 @@ SPARK_JARS = { ...@@ -14,6 +15,12 @@ SPARK_JARS = {
STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark" STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030" STARROK_FE_NODE = "192.168.1.37:8030"
REDSHIFT_JDBC = "jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=prueba_ca"
MYSQL_JDBC = "jdbc:mysql://localhost:13306/bcom_spark"
DB_TYPE = DatabaseTypeEnum.MYSQL
@flow() @flow()
def run_commission(config: Dict[str, Any]) -> None: def run_commission(config: Dict[str, Any]) -> None:
...@@ -29,7 +36,8 @@ def run_commission(config: Dict[str, Any]) -> None: ...@@ -29,7 +36,8 @@ def run_commission(config: Dict[str, Any]) -> None:
# Primer task - Extraer la data - RECORDAR: SPARK ES LAZY!!! # Primer task - Extraer la data - RECORDAR: SPARK ES LAZY!!!
start_reader = time.time() start_reader = time.time()
commission_process.get_inputs(commission_process, STARROK_JDBC, STARROK_FE_NODE) commission_process.get_inputs(commission_process, DB_TYPE, STARROK_JDBC, STARROK_FE_NODE, REDSHIFT_JDBC,
MYSQL_JDBC)
logger.info(f"Duración de extracción de datos desde la BD: {time.time() - start_reader}") logger.info(f"Duración de extracción de datos desde la BD: {time.time() - start_reader}")
# Tercer task - Obtener metas # Tercer task - Obtener metas
...@@ -37,20 +45,15 @@ def run_commission(config: Dict[str, Any]) -> None: ...@@ -37,20 +45,15 @@ def run_commission(config: Dict[str, Any]) -> None:
goals = commission_process.get_goals_2(commission_process, "GOALS", "ESTRUCTURA_ORGANIZACIONAL") goals = commission_process.get_goals_2(commission_process, "GOALS", "ESTRUCTURA_ORGANIZACIONAL")
# Quinto task - Obtener ejecutados - ¿Aplicar tmb filtro de FLAG_COMISIONABLE y ACTIVE_USER_TRAFFIC? # Quinto task - Obtener ejecutados - ¿Aplicar tmb filtro de FLAG_COMISIONABLE y ACTIVE_USER_TRAFFIC?
executes = commission_process.get_executed_2(commission_process, "ESTRUCTURA_ORGANIZACIONAL", "TEAMS", "VENTAS") executes = commission_process.get_executed_2(commission_process, "ESTRUCTURA_ORGANIZACIONAL", "DEVICES", "VENTAS")
#
# Sexo task - Obtener monto origen # Sexo task - Obtener monto origen
base = commission_process.get_source_value_2(commission_process, "ESTRUCTURA_ORGANIZACIONAL", "COMERCIAL_BASE") base = commission_process.get_source_value_2(commission_process, "ESTRUCTURA_ORGANIZACIONAL", "COMERCIAL_BASE")
# Segundo task - Crear jerarquía # Segundo task - Crear jerarquía
start_process = time.time()
# ["AGENTES", "ESTRUCTURA", "UO", "OGRANIZACIONES"] # ["AGENTES", "ESTRUCTURA", "UO", "OGRANIZACIONES"]
identifiers = ["INDIVIDUOS", "ESTRUCTURA_ORGANIZACIONAL", "UNIDAD", "ORGANIZACION"] identifiers = ["INDIVIDUOS", "ESTRUCTURA_ORGANIZACIONAL", "UNIDAD", "ORGANIZACION"]
jerarquia_graph = commission_process.create_jerarquia(commission_process, identifiers, goals, executes, base) jerarquia_graph = commission_process.create_jerarquia(commission_process, identifiers, goals, executes, base)
logger.info(f"Duración de creación de dataframes con grafos (jerarquía): {time.time() - start_process}")
result = commission_process.update_executes(commission_process, jerarquia_graph, goals, executes, base) result = commission_process.update_executes(commission_process, jerarquia_graph, goals, executes, base)
result = commission_process.get_commission_per_agent_2(commission_process, result) result = commission_process.get_commission_per_agent_2(commission_process, result)
...@@ -58,7 +61,8 @@ def run_commission(config: Dict[str, Any]) -> None: ...@@ -58,7 +61,8 @@ def run_commission(config: Dict[str, Any]) -> None:
# Task de escritura # Task de escritura
start_load = time.time() start_load = time.time()
_ = commission_process.write_result(commission_process, result, "REPORT_SUMMARY", STARROK_JDBC, STARROK_FE_NODE) _ = commission_process.write_result(commission_process, result, "REPORT_SUMMARY", DB_TYPE, STARROK_JDBC,
STARROK_FE_NODE, REDSHIFT_JDBC, MYSQL_JDBC)
logger.info(f"Duración de carga del reporte a la BD: {time.time() - start_load}") logger.info(f"Duración de carga del reporte a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso de comision: {time.time() - start_time}") logger.info(f"Duración de ejecución del proceso de comision: {time.time() - start_time}")
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
"data": [ "data": [
{ {
"identifier": "VENTAS", "identifier": "VENTAS",
"path": "s3a://prueba-id/inputs_spark/gross_202311.txt", "path": "s3a://prueba-id2/bcom-tests/inputs/gross_202311.txt",
"input_type": "txt", "input_type": "txt",
"separator": "|", "separator": "|",
"schema": { "schema": {
...@@ -29,8 +29,8 @@ ...@@ -29,8 +29,8 @@
} }
}, },
{ {
"identifier": "TEAMS", "identifier": "DEVICES",
"path": "s3a://prueba-id/inputs_spark/equipos_202311.txt", "path": "s3a://prueba-id2/bcom-tests/inputs/equipos_202311.txt",
"input_type": "txt", "input_type": "txt",
"separator": "|", "separator": "|",
"schema": { "schema": {
...@@ -45,7 +45,7 @@ ...@@ -45,7 +45,7 @@
}, },
{ {
"identifier": "GOALS", "identifier": "GOALS",
"path": "s3a://prueba-id/inputs_spark/metas_202311.csv", "path": "s3a://prueba-id2/bcom-tests/inputs/metas_202311.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -58,7 +58,7 @@ ...@@ -58,7 +58,7 @@
}, },
{ {
"identifier": "COMERCIAL_BASE", "identifier": "COMERCIAL_BASE",
"path": "s3a://prueba-id/inputs_spark/planta_comercial_202311.csv", "path": "s3a://prueba-id2/bcom-tests/inputs/planta_comercial_202311.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -70,7 +70,7 @@ ...@@ -70,7 +70,7 @@
}, },
{ {
"identifier": "INDIVIDUOS", "identifier": "INDIVIDUOS",
"path": "s3a://prueba-id/inputs_spark/individuos_2023111813.csv", "path": "s3a://prueba-id2/bcom-tests/inputs/individuos_2023111813.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -89,7 +89,7 @@ ...@@ -89,7 +89,7 @@
}, },
{ {
"identifier": "ROLES", "identifier": "ROLES",
"path": "s3a://prueba-id/inputs_spark/roles_2023111812.csv", "path": "s3a://prueba-id2/bcom-tests/inputs/roles_2023111812.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -99,7 +99,7 @@ ...@@ -99,7 +99,7 @@
}, },
{ {
"identifier": "ORGANIZACION", "identifier": "ORGANIZACION",
"path": "s3a://prueba-id/inputs_spark/organizaciones_2023111813.csv", "path": "s3a://prueba-id2/bcom-tests/inputs/organizaciones_2023111813.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -121,7 +121,7 @@ ...@@ -121,7 +121,7 @@
}, },
{ {
"identifier": "UNIDAD", "identifier": "UNIDAD",
"path": "s3a://prueba-id/inputs_spark/unidades_organizacionales_2023111812.csv", "path": "s3a://prueba-id2/bcom-tests/inputs/unidades_organizacionales_2023111812.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -139,7 +139,7 @@ ...@@ -139,7 +139,7 @@
}, },
{ {
"identifier": "ESTRUCTURA_ORGANIZACIONAL", "identifier": "ESTRUCTURA_ORGANIZACIONAL",
"path": "s3a://prueba-id/inputs_spark/estructura_organizacional_2023111812.csv", "path": "s3a://prueba-id2/bcom-tests/inputs/estructura_organizacional_2023111812.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -154,5 +154,13 @@ ...@@ -154,5 +154,13 @@
"starroks": { "starroks": {
"user": "root", "user": "root",
"password": "" "password": ""
},
"redshift": {
"user": "awsuser",
"password": "Awsuser123"
},
"mysql": {
"user": "root",
"password": "root"
} }
} }
\ No newline at end of file
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
"data": [ "data": [
{ {
"identifier": "FACTURACION", "identifier": "FACTURACION",
"path": "s3a://prueba-id/bcom-tests/inputs/Facturacion_20240320.csv", "path": "s3a://prueba-id2/bcom-tests/inputs/Facturacion_20240320.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
}, },
{ {
"identifier": "ENDING", "identifier": "ENDING",
"path": "s3a://prueba-id/bcom-tests/inputs/Ending_20240320.csv", "path": "s3a://prueba-id2/bcom-tests/inputs/Ending_20240320.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -42,5 +42,13 @@ ...@@ -42,5 +42,13 @@
"starroks": { "starroks": {
"user": "root", "user": "root",
"password": "" "password": ""
},
"redshift": {
"user": "awsuser",
"password": "Awsuser123"
},
"mysql": {
"user": "admin",
"password": "awsuser123"
} }
} }
\ No newline at end of file
...@@ -2,6 +2,8 @@ import time ...@@ -2,6 +2,8 @@ import time
import json import json
from typing import Any, Dict from typing import Any, Dict
from prefect import flow, get_run_logger from prefect import flow, get_run_logger
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
from Enum.InputTypeEnum import InputTypeEnum
from Pipeline.ETLProcess import ETLProcess from Pipeline.ETLProcess import ETLProcess
...@@ -12,12 +14,21 @@ SPARK_JARS = { ...@@ -12,12 +14,21 @@ SPARK_JARS = {
"COMMON": "/opt/spark-jars/hadoop-common-3.3.4.jar", "COMMON": "/opt/spark-jars/hadoop-common-3.3.4.jar",
"AWS_CLIENT": "/opt/spark-jars/hadoop-client-3.3.4.jar", "AWS_CLIENT": "/opt/spark-jars/hadoop-client-3.3.4.jar",
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar", "STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
"MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar" "MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar",
"REDSHIFT": "/opt/spark-jars/redshift-jdbc42-2.1.0.12.jar"
} }
STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark" STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030" STARROK_FE_NODE = "192.168.1.37:8030"
REDSHIFT_JDBC = "jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=prueba_ca"
MYSQL_JDBC = "jdbc:mysql://localhost:13306/bcom_spark"
DB_TYPE = DatabaseTypeEnum.MYSQL
SOURCE_TYPE = InputTypeEnum.BUCKET
@flow @flow
def run_etl(config: Dict[str, Any]) -> None: def run_etl(config: Dict[str, Any]) -> None:
...@@ -29,7 +40,7 @@ def run_etl(config: Dict[str, Any]) -> None: ...@@ -29,7 +40,7 @@ def run_etl(config: Dict[str, Any]) -> None:
# Conexion a Spark (LocalMode, StandAlone or Clúster) # Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time() start_init = time.time()
etl_process.init(SPARK_JARS) etl_process.init(SPARK_JARS, SOURCE_TYPE)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}") logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - (Reader) - Extraer los ficheros # Primer task - (Reader) - Extraer los ficheros
...@@ -42,31 +53,40 @@ def run_etl(config: Dict[str, Any]) -> None: ...@@ -42,31 +53,40 @@ def run_etl(config: Dict[str, Any]) -> None:
etl_process.set_schema(etl_process) etl_process.set_schema(etl_process)
# Process - Insumo Gross (Ventas) # Process - Insumo Gross (Ventas)
ventas_flag = etl_process.process_gross.submit(etl_process, "VENTAS") ventas_flag = etl_process.process_gross(etl_process, "VENTAS")
# Process - Insumo Team (Equipos) # Process - Insumo Team (Equipos)
teams_flag = etl_process.process_teams.submit(etl_process, "TEAMS") teams_flag = etl_process.process_teams(etl_process, "DEVICES")
logger.info(f"Duración de transformación y limpieza de datos: {time.time() - start_transform}") logger.info(f"Duración de transformación y limpieza de datos: {time.time() - start_transform}")
# Write - Insumo GROSS # Write - Insumo GROSS
start_load = time.time() start_load = time.time()
etl_process.write.submit(etl_process, "VENTAS", STARROK_JDBC, STARROK_FE_NODE, ventas_flag) etl_process.write(etl_process, "VENTAS", STARROK_JDBC, STARROK_FE_NODE, ventas_flag, DB_TYPE,
# Write - Insumo TEAMS REDSHIFT_JDBC, MYSQL_JDBC)
etl_process.write.submit(etl_process, "TEAMS", STARROK_JDBC, STARROK_FE_NODE, teams_flag) # Write - Insumo DEVICES
etl_process.write(etl_process, "DEVICES", STARROK_JDBC, STARROK_FE_NODE, teams_flag, DB_TYPE,
REDSHIFT_JDBC, MYSQL_JDBC)
# Write - Insumo GOALS # Write - Insumo GOALS
etl_process.write.submit(etl_process, "GOALS", STARROK_JDBC, STARROK_FE_NODE) etl_process.write(etl_process, "GOALS", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo PLANTA # Write - Insumo PLANTA
etl_process.write.submit(etl_process, "COMERCIAL_BASE", STARROK_JDBC, STARROK_FE_NODE) etl_process.write(etl_process, "COMERCIAL_BASE", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo INDIVIDUOS # Write - Insumo INDIVIDUOS
etl_process.write.submit(etl_process, "INDIVIDUOS", STARROK_JDBC, STARROK_FE_NODE) etl_process.write(etl_process, "INDIVIDUOS", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo ROLES # Write - Insumo ROLES
etl_process.write.submit(etl_process, "ROLES", STARROK_JDBC, STARROK_FE_NODE) etl_process.write(etl_process, "ROLES", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo ORGANIZACION # Write - Insumo ORGANIZACION
etl_process.write.submit(etl_process, "ORGANIZACION", STARROK_JDBC, STARROK_FE_NODE) etl_process.write(etl_process, "ORGANIZACION", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo UNIDADES # Write - Insumo UNIDADES
etl_process.write.submit(etl_process, "UNIDAD", STARROK_JDBC, STARROK_FE_NODE) etl_process.write(etl_process, "UNIDAD", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo ESTRUCTURA # Write - Insumo ESTRUCTURA
etl_process.write.submit(etl_process, "ESTRUCTURA_ORGANIZACIONAL", STARROK_JDBC, STARROK_FE_NODE) etl_process.write(etl_process, "ESTRUCTURA_ORGANIZACIONAL", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
logger.info(f"Duración de carga de datos a la BD: {time.time() - start_load}") logger.info(f"Duración de carga de datos a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso ETL General: {time.time() - start_time}") logger.info(f"Duración de ejecución del proceso ETL General: {time.time() - start_time}")
......
...@@ -4,6 +4,8 @@ from typing import Any, Dict ...@@ -4,6 +4,8 @@ from typing import Any, Dict
from prefect import flow, get_run_logger from prefect import flow, get_run_logger
from Pipeline.ETLProcess import ETLProcess from Pipeline.ETLProcess import ETLProcess
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
from Enum.InputTypeEnum import InputTypeEnum
SPARK_JARS = { SPARK_JARS = {
...@@ -12,12 +14,21 @@ SPARK_JARS = { ...@@ -12,12 +14,21 @@ SPARK_JARS = {
"COMMON": "/opt/spark-jars/hadoop-common-3.3.4.jar", "COMMON": "/opt/spark-jars/hadoop-common-3.3.4.jar",
"AWS_CLIENT": "/opt/spark-jars/hadoop-client-3.3.4.jar", "AWS_CLIENT": "/opt/spark-jars/hadoop-client-3.3.4.jar",
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar", "STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
"MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar" "MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar",
"REDSHIFT": "/opt/spark-jars/redshift-jdbc42-2.1.0.12.jar"
} }
STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark" STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030" STARROK_FE_NODE = "192.168.1.37:8030"
REDSHIFT_JDBC = "jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=prueba_ca"
MYSQL_JDBC = "jdbc:mysql://localhost:13306/bcom_spark"
DB_TYPE = DatabaseTypeEnum.MYSQL
SOURCE_TYPE = InputTypeEnum.BUCKET
@flow @flow
def run_etl(config: Dict[str, Any]) -> None: def run_etl(config: Dict[str, Any]) -> None:
...@@ -29,7 +40,7 @@ def run_etl(config: Dict[str, Any]) -> None: ...@@ -29,7 +40,7 @@ def run_etl(config: Dict[str, Any]) -> None:
# Conexion a Spark (LocalMode, StandAlone or Clúster) # Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time() start_init = time.time()
etl_process.init(SPARK_JARS) etl_process.init(SPARK_JARS, SOURCE_TYPE)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}") logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - (Reader) - Extraer los ficheros # Primer task - (Reader) - Extraer los ficheros
...@@ -46,10 +57,12 @@ def run_etl(config: Dict[str, Any]) -> None: ...@@ -46,10 +57,12 @@ def run_etl(config: Dict[str, Any]) -> None:
logger.info(f"Duración de transformación y limpieza de datos: {time.time() - start_transform}") logger.info(f"Duración de transformación y limpieza de datos: {time.time() - start_transform}")
start_load = time.time() start_load = time.time()
# Write - Insumo TEAMS # Write - Insumo DEVICES
etl_process.write(etl_process, "FACTURACION", STARROK_JDBC, STARROK_FE_NODE, teams_fact) etl_process.write(etl_process, "FACTURACION", STARROK_JDBC, STARROK_FE_NODE, teams_fact, DB_TYPE,
REDSHIFT_JDBC, MYSQL_JDBC)
# Write - Insumo GOALS # Write - Insumo GOALS
etl_process.write(etl_process, "ENDING", STARROK_JDBC, STARROK_FE_NODE) etl_process.write(etl_process, "ENDING", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
logger.info(f"Duración de carga de datos a la BD: {time.time() - start_load}") logger.info(f"Duración de carga de datos a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso ETL General: {time.time() - start_time}") logger.info(f"Duración de ejecución del proceso ETL General: {time.time() - start_time}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment