Commit a78f4f94 authored by Cristian Aguirre's avatar Cristian Aguirre

Add starroks.py

parent 9c72a0a6
from typing import Dict, Any from typing import Dict, Any, List
import logging import logging
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
...@@ -18,14 +18,23 @@ class BucketAwsInput: ...@@ -18,14 +18,23 @@ class BucketAwsInput:
self.schema = params["schema"] self.schema = params["schema"]
self.data = None self.data = None
def get_data(self) -> None: def get_data(self, replace: bool, replace_space_str: str) -> None:
try: try:
def replace_delimiters(line):
line = line.replace(replace_space_str, " ")
return line
file_type = FileTypeEnum(self.input_type) file_type = FileTypeEnum(self.input_type)
if not self.input_path.startswith("s3://") and not self.input_path.startswith("s3a://"): if not self.input_path.startswith("s3://") and not self.input_path.startswith("s3a://"):
raise Exception(f"Error getting descriptor from S3. Path should start with s3://") raise Exception(f"Error getting descriptor from S3. Path should start with s3://")
final_path = self.input_path final_path = self.input_path
if file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TXT: if file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TXT:
self.data = self.session.read.csv(final_path, header=True, sep=self.separator, inferSchema=True) if replace:
lines_rdd = self.session.sparkContext.textFile(final_path)
cleaned = lines_rdd.map(replace_delimiters)
self.data = self.session.read.csv(cleaned, header=True, sep=self.separator)
else:
self.data = self.session.read.csv(final_path, header=True, sep=self.separator)
elif file_type == FileTypeEnum.PARQUET: elif file_type == FileTypeEnum.PARQUET:
self.data = self.session.read.parquet(final_path, header=True) self.data = self.session.read.parquet(final_path, header=True)
else: else:
......
...@@ -17,7 +17,7 @@ class Input: ...@@ -17,7 +17,7 @@ class Input:
self.data = None self.data = None
def get_data(self) -> None: def get_data(self, replace: bool = False, replace_space_str: str = "\t") -> None:
self.factory.get_data() self.factory.get_data(replace, replace_space_str)
self.data = self.factory.data self.data = self.factory.data
import logging import logging
from typing import Dict, Any from typing import Dict, Any, List
from prefect import flow, task from prefect import task
import time
from pyspark.sql import DataFrame from pyspark.sql import DataFrame
from pyspark.sql.functions import when, lit, sum as py_sum, col, count from pyspark.sql.functions import when, lit, sum as py_sum, count, coalesce
from graphframes import GraphFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from Utils.SparkUtils import createSession, get_goal_by_kpi, get_execute_by_service from Utils.SparkUtils import createSession, find_related_vertices
logger = logging.getLogger() logger = logging.getLogger()
...@@ -14,43 +18,34 @@ class CommissionProcess: ...@@ -14,43 +18,34 @@ class CommissionProcess:
def __init__(self, config: Dict[str, Any]) -> None: def __init__(self, config: Dict[str, Any]) -> None:
self.conf = config self.conf = config
self.identifier = self.conf["identifier"] self.identifier = self.conf["identifier"]
self.period = self.conf["period"]
self.session = None self.session = None
self.period = self.conf["period"]
self.agents = []
self.periods = []
self.inputs = {} self.inputs = {}
self.inter_results = {} self.inter_results = {}
def init(self, spark_jars: Dict[str, str], mongodb_uri: str = "") -> None: def init(self, spark_jars: Dict[str, str]) -> None:
self.session = createSession(self.identifier, spark_jars, mongodb_uri) self.session = createSession(self.identifier, spark_jars)
@task @task
def get_inputs(self) -> None: def get_inputs(self, starroks_jdbc: str, starroks_fe: str) -> None:
try: try:
inputs = self.conf["inputs"] inputs = self.conf["inputs"]
database = starroks_jdbc[starroks_jdbc.rfind("/") + 1:]
starroks_user = self.conf["starroks"]["user"]
starroks_pass = self.conf["starroks"]["password"]
for input_obj in inputs["data"]: for input_obj in inputs["data"]:
identifier = input_obj["identifier"] identifier = input_obj["identifier"]
df_input = self.session.read.format("com.mongodb.spark.sql.DefaultSource").\ df_input = self.session.read.format("starrocks"). \
option("collection", identifier).load() option("starrocks.table.identifier", database+"."+identifier). \
option("starrocks.fe.http.url", starroks_fe). \
option("starrocks.fe.jdbc.url", starroks_jdbc). \
option("starrocks.user", starroks_user). \
option("starrocks.password", starroks_pass).load()
self.inputs.update({identifier: df_input}) self.inputs.update({identifier: df_input})
except Exception as e: except Exception as e:
raise AssertionError(f"Error in function 'get_inputs'. {e}") raise AssertionError(f"Error in function 'get_inputs'. {e}")
@task
def get_agents_and_period(self, identifier: str) -> None:
try:
# Obtener los agentes
agents_df = self.inputs[identifier].groupBy("AGENTE_COMISIONA").count()
self.agents = [row[0] for row in agents_df.select("AGENTE_COMISIONA").collect() if row[0]]
# Obtener los periodos
period_df = self.inputs[identifier].groupBy("PERIODO_PROCESO_CODIGO").count()
self.periods = [row[0] for row in period_df.select("PERIODO_PROCESO_CODIGO").collect()]
except Exception as e:
raise AssertionError(f"Error obteniendo agentes. {e}")
@task @task
def filter_data(self, identifier: str) -> None: def filter_data(self, identifier: str) -> None:
try: try:
...@@ -63,7 +58,42 @@ class CommissionProcess: ...@@ -63,7 +58,42 @@ class CommissionProcess:
raise AssertionError(f"Error filtrando datos. {e}") raise AssertionError(f"Error filtrando datos. {e}")
@task @task
def get_goals_second_way(self, gross_name: str, goals_name: str) -> DataFrame: def create_jerarquia(self, identifiers: List[str], df_goals: DataFrame, df_executes: DataFrame, df_base: DataFrame):
graph = None
try:
merged = df_goals.join(df_executes, ["AGENTE_COMISIONA"], 'left')
merged = merged.join(df_base, ["AGENTE_COMISIONA"], 'left')
agent_df = self.inputs[identifiers[0]]
structure_df = self.inputs[identifiers[1]]
uo_df = self.inputs[identifiers[2]]
org_df = self.inputs[identifiers[3]]
agent_1 = agent_df.join(structure_df, agent_df["PIIN_IDENT"] == structure_df["PIOS_INDID"], 'leftsemi')
agent_1 = agent_1.withColumnRenamed("PIIN_IDENT", "id")
agent_1 = agent_1.join(merged, agent_1["id"] == merged["AGENTE_COMISIONA"], "left")
estructura_1 = structure_df.join(uo_df.select("PIOU_ORGID", "PIOU_RESPO"),
structure_df["PIOS_ORGID"] == uo_df["PIOU_ORGID"], 'left')
estructura_1 = estructura_1.withColumn("PIOS_SUPER",
coalesce(estructura_1["PIOS_SUPER"], estructura_1["PIOU_RESPO"]))
estructura_2 = estructura_1.join(org_df.select("PIOR_ORGID", "PIOR_RESPO"),
estructura_1["PIOS_ORGID"] == org_df["PIOR_ORGID"], 'left')
estructura_2 = estructura_2.withColumn("PIOS_SUPER",
coalesce(estructura_2["PIOS_SUPER"], estructura_2["PIOR_RESPO"]))
estructura_2 = estructura_2.drop(*["PIOU_ORGID", "PIOU_RESPO", "PIOR_ORGID", "PIOR_RESPO"])
estructura_2 = estructura_2.withColumn("PIOS_RELATION", lit("responsable"))
estructura_2 = estructura_2.select("PIOS_SUPER", "PIOS_INDID", "PIOS_RELATION")
estructura_2 = estructura_2.withColumnRenamed("PIOS_SUPER", "src").withColumnRenamed("PIOS_INDID", "dst")
graph = GraphFrame(agent_1, estructura_2)
except Exception as e:
logger.error(f"Error creando dataframe de jerarquia. {e}")
finally:
return graph
@task
def get_goals(self, gross_name: str, goals_name: str) -> DataFrame:
result = None result = None
try: try:
goal_df = self.inputs[goals_name] goal_df = self.inputs[goals_name]
...@@ -91,7 +121,34 @@ class CommissionProcess: ...@@ -91,7 +121,34 @@ class CommissionProcess:
return result return result
@task @task
def get_executed_second_way(self, gross_name: str, team_name: str) -> DataFrame: def get_goals_2(self, goals_name: str, structure_name: str) -> DataFrame:
result = None
try:
goal_df = self.inputs[goals_name]
structure_df = self.inputs[structure_name]
goal_df = goal_df[goal_df["PERIODO_PROCESO_CODIGO"] == self.period]
goal_df = goal_df.withColumn("META_1", when(goal_df["KPI"] == "EQUIPOS", goal_df["META_FINAL"]).otherwise(lit(0)))
goal_df = goal_df.withColumn("META_2", when(goal_df["KPI"] == "GROSS B2B", goal_df["META_FINAL"]).otherwise(lit(0)))
goal_df = goal_df.withColumn("META_3", when(goal_df["KPI"] == "GROSS POSPAGO", goal_df["META_FINAL"]).otherwise(lit(0)))
sums = [py_sum(column).alias(column) for column in ["META_1", "META_2", "META_3"]]
goal_group = goal_df.groupBy("CEDULA").agg(*sums)
condition = [structure_df["PIOS_INDID"] == goal_group["CEDULA"]]
result = structure_df.select("PIOS_INDID").join(goal_group, condition, how='left')
result = result.na.fill(value=0, subset=["META_1", "META_2", "META_3"])
result = result.drop(*["CEDULA"])
result = result.withColumn("PERIODO_PROCESO_CODIGO", lit(self.period))
result = result.withColumnRenamed("PIOS_INDID", "AGENTE_COMISIONA")
except Exception as e:
raise AssertionError(f"Error obteniendo metas. Segunda forma. {e}")
finally:
return result
@task
def get_executed(self, gross_name: str, team_name: str) -> DataFrame:
result = None result = None
try: try:
team_df = self.inputs[team_name] team_df = self.inputs[team_name]
...@@ -140,6 +197,101 @@ class CommissionProcess: ...@@ -140,6 +197,101 @@ class CommissionProcess:
finally: finally:
return result return result
@task
def get_executed_2(self, structure_name: str, team_name: str, gross_name: str) -> DataFrame:
result = None
try:
team_df = self.inputs[team_name]
structure_df = self.inputs[structure_name]
gross_df = self.inputs[gross_name]
team_df = team_df[team_df["PERIODO_PROCESO_CODIGO"] == self.period]
gross_df = gross_df[gross_df["PERIODO_PROCESO_CODIGO"] == self.period]
executed1 = team_df[team_df["MODELO_TIPO"] == "SMARTPHONE"]
executed1 = executed1.groupBy("CONSULTOR_DOCUMENTO").agg(count("MODELO_TIPO").alias("EJECUTADO_1"))
condition = [structure_df["PIOS_INDID"] == executed1["CONSULTOR_DOCUMENTO"]]
result_1 = structure_df.select("PIOS_INDID").join(executed1, condition, how='left')
result_1 = result_1.na.fill(value=0, subset=["EJECUTADO_1"])
result_1 = result_1.drop(*["CONSULTOR_DOCUMENTO"])
gross_b2b = gross_df[(gross_df["SEGMENTO"] == "B2B") & (gross_df["SERVICIO"] == "Postpaid") &
(gross_df["FLAG_COMISIONABLE"] == "1") & (gross_df["ACTIVE_USER_TRAFFIC"] == "1")]
gross_b2b = gross_b2b.groupBy("AGENTE_COMISIONA").agg(count("SERVICIO").alias("EJECUTADO_2"))
condition = [structure_df["PIOS_INDID"] == gross_b2b["AGENTE_COMISIONA"]]
result_2 = structure_df.select("PIOS_INDID").join(gross_b2b, condition, how='left')
result_2 = result_2.na.fill(value=0, subset=["EJECUTADO_2"])
result_2 = result_2.drop(*["AGENTE_COMISIONA"])
gross_b2c = gross_df[(gross_df["SEGMENTO"] == "B2C") & (gross_df["SERVICIO"] == "Postpaid") &
(gross_df["FLAG_COMISIONABLE"] == "1") & (gross_df["ACTIVE_USER_TRAFFIC"] == "1")]
gross_b2c = gross_b2c.groupBy("AGENTE_COMISIONA").agg(count("SERVICIO").alias("EJECUTADO_3"))
condition = [structure_df["PIOS_INDID"] == gross_b2c["AGENTE_COMISIONA"]]
result_3 = structure_df.select("PIOS_INDID").join(gross_b2c, condition, how='left')
result_3 = result_3.na.fill(value=0, subset=["EJECUTADO_3"])
result_3 = result_3.drop(*["AGENTE_COMISIONA"])
result = result_1.join(result_2, ["PIOS_INDID"], 'left')
result = result.join(result_3, ["PIOS_INDID"])
result = result.withColumnRenamed("PIOS_INDID", "AGENTE_COMISIONA")
except Exception as e:
logger.error(f"Error obteniendo ejecutados. Segunda forma. {e}")
finally:
return result
@task
def update_executes(self, graph: GraphFrame, df_goals: DataFrame, df_executes: DataFrame, df_base: DataFrame):
merged = None
try:
start_process = time.time()
graph_related = find_related_vertices(graph)
graph_related = {k: v for k, v in graph_related.items() if len(v) != 0}
merged = df_goals.join(df_executes, "AGENTE_COMISIONA", 'left')
merged = merged.join(df_base, "AGENTE_COMISIONA", 'left')
# for vertex in graph_related.keys():
# value = merged.filter(merged["AGENTE_COMISIONA"].isin(graph_related[vertex])).agg(py_sum("EJECUTADO_1"),
# py_sum("EJECUTADO_2"),
# py_sum("EJECUTADO_3")).collect()[0]
# value_1, value_2, value_3 = value[0], value[1], value[2]
# merged = merged.withColumn("EJECUTADO_1", when(merged["AGENTE_COMISIONA"] == vertex, value_1).otherwise(merged["EJECUTADO_1"]))
# merged = merged.withColumn("EJECUTADO_2", when(merged["AGENTE_COMISIONA"] == vertex, value_2).otherwise(merged["EJECUTADO_2"]))
# merged = merged.withColumn("EJECUTADO_3", when(merged["AGENTE_COMISIONA"] == vertex, value_3).otherwise(merged["EJECUTADO_3"]))
graph_related_df = self.session.createDataFrame([(k, v) for k, v in graph_related.items()],
["vertex", "related_agents"])
# Agregar los valores ejecutados por cada agente
agg_df = merged.join(graph_related_df, F.expr("array_contains(related_agents, AGENTE_COMISIONA)"), "inner") \
.groupBy("vertex") \
.agg(F.sum("EJECUTADO_1").alias("sum_EJECUTADO_1"),
F.sum("EJECUTADO_2").alias("sum_EJECUTADO_2"),
F.sum("EJECUTADO_3").alias("sum_EJECUTADO_3"))
# Crear ventanas para particionar por "AGENTE_COMISIONA" y ordenar por alguna columna (aquí suponemos "order_column")
windowSpec1 = Window.partitionBy("AGENTE_COMISIONA").orderBy("AGENTE_COMISIONA")
# Utilizar la función lag() para obtener el valor ejecutado anteriormente por cada agente
for i in range(1, 4):
merged = merged.withColumn(f"EJECUTADO_{i}",
F.coalesce(F.lag(f"EJECUTADO_{i}", 1).over(windowSpec1), F.lit(0)))
# Realizar una join con agg_df para actualizar los valores ejecutados
for i in range(1, 4):
merged = merged.join(agg_df, merged["AGENTE_COMISIONA"] == agg_df["vertex"], "left") \
.withColumn(f"EJECUTADO_{i}", F.when(F.col("vertex").isNull(), merged[f"EJECUTADO_{i}"])
.otherwise(F.col(f"sum_EJECUTADO_{i}"))) \
.drop("vertex", "related_agents", f"sum_EJECUTADO_{i}")
merged.show()
print(f"Duración de creación de dataframes con grafos (jerarquía): {time.time() - start_process}")
except Exception as e:
logger.error(f"Error actualizando ejecutados por jerarquía. {e}")
finally:
return merged
@task @task
def get_source_value(self, gross_name: str, base_name: str) -> DataFrame: def get_source_value(self, gross_name: str, base_name: str) -> DataFrame:
result = None result = None
...@@ -163,9 +315,29 @@ class CommissionProcess: ...@@ -163,9 +315,29 @@ class CommissionProcess:
finally: finally:
return result return result
@task
def get_source_value_2(self, structure_name: str, base_name: str) -> DataFrame:
result = None
try:
base_df = self.inputs[base_name]
structure_df = self.inputs[structure_name]
base_df = base_df[base_df["PERIODO_PROCESO_CODIGO"] == self.period]
base_df = base_df.select("CEDULA", "VARIABLE_COMISION")
base_df = base_df.dropDuplicates()
condition = [structure_df["PIOS_INDID"] == base_df["CEDULA"]]
result = structure_df.select("PIOS_INDID").join(base_df, condition, how='left')
result = result.na.fill(value=0, subset=["VARIABLE_COMISION"])
result = result.drop(*["CEDULA"])
result = result.withColumnRenamed("PIOS_INDID", "AGENTE_COMISIONA")
except Exception as e:
logger.error(f"Error obteniendo valor de monto de origen. {e}")
finally:
return result
@task @task
def get_commission_per_agent(self, df_goals: DataFrame, df_executes: DataFrame, df_base: DataFrame) -> DataFrame: def get_commission_per_agent(self, df_goals: DataFrame, df_executes: DataFrame, df_base: DataFrame) -> DataFrame:
merged = None
try: try:
merged = df_goals.join(df_executes, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"], 'left') merged = df_goals.join(df_executes, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"], 'left')
merged = merged.join(df_base, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"], 'left') merged = merged.join(df_base, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"], 'left')
...@@ -187,6 +359,39 @@ class CommissionProcess: ...@@ -187,6 +359,39 @@ class CommissionProcess:
when(merged["PONDERADO TOTAL"] >= 2, lit(2))) when(merged["PONDERADO TOTAL"] >= 2, lit(2)))
merged = merged.withColumn("COMISION", merged["FACTOR"]*merged["VARIABLE_COMISION"]) merged = merged.withColumn("COMISION", merged["FACTOR"]*merged["VARIABLE_COMISION"])
merged = merged.withColumnRenamed("EJECUTADO_2", "EJECUTADO B2B")
merged = merged.withColumnRenamed("EJECUTADO_3", "EJECUTADO B2C")
merged = merged.withColumnRenamed("EJECUTADO_1", "EJECUTADO Cant Smart")
merged = merged.withColumnRenamed("META_1", "META EQUIPOS")
merged = merged.withColumnRenamed("META_2", "META GROSS B2B")
merged = merged.withColumnRenamed("META_3", "META GROSS POSPAGO")
merged = merged.withColumnRenamed("VARIABLE_COMISION", "VARIABLE COMISION (monto de origen)")
except Exception as e:
logger.error(f"Error operando la comisión. {e}")
finally:
return
@task
def get_commission_per_agent_2(self, merged: DataFrame) -> DataFrame:
try:
merged = merged.withColumn("CUMPLIMIENTO EQUIPOS",
when(merged["META_1"] != 0, merged["EJECUTADO_1"]/merged["META_1"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO EQUIPOS", merged["CUMPLIMIENTO EQUIPOS"] * 0.25)
merged = merged.withColumn("CUMPLIMIENTO B2B",
when(merged["META_2"] != 0, merged["EJECUTADO_2"]/merged["META_2"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO B2B", merged["CUMPLIMIENTO B2B"] * 0.1)
merged = merged.withColumn("CUMPLIMIENTO B2C",
when(merged["META_3"] != 0, merged["EJECUTADO_3"]/merged["META_3"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO B2C", merged["CUMPLIMIENTO B2C"] * 0.65)
merged = merged.withColumn("PONDERADO TOTAL", merged["PONDERADO EQUIPOS"] + merged["PONDERADO B2B"] + merged["PONDERADO B2C"])
merged = merged.withColumn("FACTOR", when(merged["PONDERADO TOTAL"] <= 0.79, lit(0)).
when((merged["PONDERADO TOTAL"] >= 0.8) & (merged["PONDERADO TOTAL"] < 2), merged["PONDERADO TOTAL"]).
when(merged["PONDERADO TOTAL"] >= 2, lit(2)))
merged = merged.withColumn("COMISION", merged["FACTOR"]*merged["VARIABLE_COMISION"])
merged = merged.withColumnRenamed("EJECUTADO_2", "EJECUTADO B2B") merged = merged.withColumnRenamed("EJECUTADO_2", "EJECUTADO B2B")
merged = merged.withColumnRenamed("EJECUTADO_3", "EJECUTADO B2C") merged = merged.withColumnRenamed("EJECUTADO_3", "EJECUTADO B2C")
merged = merged.withColumnRenamed("EJECUTADO_1", "EJECUTADO Cant Smart") merged = merged.withColumnRenamed("EJECUTADO_1", "EJECUTADO Cant Smart")
...@@ -200,9 +405,19 @@ class CommissionProcess: ...@@ -200,9 +405,19 @@ class CommissionProcess:
return merged return merged
@task @task
def write_result(self, df: DataFrame, collection_name: str) -> None: def write_result(self, df: DataFrame, collection_name: str, starroks_jdbc: str, starroks_fe: str) -> None:
try: try:
df.write.format("com.mongodb.spark.sql.DefaultSource").option("collection", collection_name). \ database = starroks_jdbc[starroks_jdbc.rfind("/") + 1:]
mode("append").save() starroks_user = self.conf["starroks"]["user"]
starroks_pass = self.conf["starroks"]["password"]
df.write.format("starrocks") \
.option("starrocks.fe.http.url", starroks_fe) \
.option("starrocks.fe.jdbc.url", starroks_jdbc) \
.option("starrocks.table.identifier", database+"."+collection_name) \
.option("starrocks.user", starroks_user) \
.option("starrocks.password", starroks_pass) \
.mode("append") \
.save()
except Exception as e: except Exception as e:
logger.error(f"Error guardando datos en BD. {e}") logger.error(f"Error guardando datos en BD. {e}")
from typing import Dict, Any from typing import Dict, Any
import logging import logging
from pyspark.sql.functions import col, when, lit from pyspark.sql.functions import col, when, lit, to_date, date_format, date_add
from pyspark.sql.types import StructType, StructField, StringType
from prefect import task from prefect import task
from Enum.DataTypeEnum import DataTypeEnum from Enum.DataTypeEnum import DataTypeEnum
...@@ -19,8 +20,8 @@ class ETLProcess: ...@@ -19,8 +20,8 @@ class ETLProcess:
self.inputs = {} self.inputs = {}
def init(self, spark_jars: Dict[str, str], mongodb_uri: str = "", starrok_uri: str = "") -> None: def init(self, spark_jars: Dict[str, str]) -> None:
self.session = createSession(self.identifier, spark_jars, mongodb_uri, starrok_uri) self.session = createSession(self.identifier, spark_jars)
@task @task
def reader(self) -> None: def reader(self) -> None:
...@@ -33,7 +34,11 @@ class ETLProcess: ...@@ -33,7 +34,11 @@ class ETLProcess:
params = {"identifier": identifier, "path": input_obj["path"], "type": input_obj["input_type"], params = {"identifier": identifier, "path": input_obj["path"], "type": input_obj["input_type"],
"separator": input_obj["separator"], "schema": input_obj["schema"]} "separator": input_obj["separator"], "schema": input_obj["schema"]}
current_input = Input(input_type, self.session, params, provider) current_input = Input(input_type, self.session, params, provider)
current_input.get_data() # Caso especial de reemplazar "\t" con " "
if identifier == "FACTURACION":
current_input.get_data(True)
else:
current_input.get_data()
self.inputs.update({identifier: current_input.data}) self.inputs.update({identifier: current_input.data})
except Exception as e: except Exception as e:
raise AssertionError(f"Error in function extrayendo data. Reader. {e}") raise AssertionError(f"Error in function extrayendo data. Reader. {e}")
...@@ -98,17 +103,40 @@ class ETLProcess: ...@@ -98,17 +103,40 @@ class ETLProcess:
return success return success
@task @task
def write(self, identifier: str, prev_status: bool = True) -> None: def process_facturacion(self, identifier: str) -> bool:
success = False
try:
df = self.inputs[identifier]
df = df.withColumn("fecha_vencimiento_fact", to_date(df["FECHA_VENCIMIENTO"], "dd/MM/yy"))
df = df.withColumn("fecha_periodo_fact",
to_date(date_format(col("PERIODO_PROCESO_CODIGO"), "yyyyMM") + "01", "yyyyMMdd"))
df = df.withColumn("FACTURA_VENCIDA",
when(date_add(col("fecha_periodo_fact"), 5) < col("fecha_vencimiento_fact"), lit("SI"))
.otherwise(lit("NO")))
self.inputs[identifier] = df
success = True
except Exception as e:
logger.error(f"Error transformando archivo de facturacion. {e}")
finally:
return success
@task
def write(self, identifier: str, starroks_jdbc: str, starroks_fe: str, prev_status: bool = True) -> None:
try: try:
# self.inputs[identifier].write.format("starrocks"). \ database = starroks_jdbc[starroks_jdbc.rfind("/")+1:]
# option("dbtable", identifier).mode("overwrite").save() starroks_user = self.conf["starroks"]["user"]
starroks_pass = self.conf["starroks"]["password"]
self.inputs[identifier].write.format("starrocks") \ self.inputs[identifier].write.format("starrocks") \
.option("starrocks.fe.http.url", "ec2-34-231-243-52.compute-1.amazonaws.com:8030") \ .option("starrocks.fe.http.url", starroks_fe) \
.option("starrocks.fe.jdbc.url", "jdbc:mysql://ec2-34-231-243-52.compute-1.amazonaws.com:9030/bcom_spark") \ .option("starrocks.fe.jdbc.url", starroks_jdbc) \
.option("starrocks.table.identifier", "bcom_spark."+identifier) \ .option("starrocks.table.identifier", database+"."+identifier) \
.option("starrocks.user", "root") \ .option("starrocks.user", starroks_user) \
.option("starrocks.password", "") \ .option("starrocks.password", starroks_pass) \
.mode("append") \ .mode("append") \
.save() .save()
except Exception as e: except Exception as e:
logger.error(f"Erro guardando resultados. {e}") logger.error(f"Error guardando resultados. {e}")
from typing import Dict from typing import Dict
from pyspark.sql import SparkSession, DataFrame from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
def createSession(name: str, spark_jars: Dict[str, str], mongodb_uri: str, starrok_uri: str) -> SparkSession: def createSession(name: str, spark_jars: Dict[str, str]) -> SparkSession:
session = None session = None
try: try:
jars = list(spark_jars.values()) jars = list(spark_jars.values())
jars = ",".join(jars) jars = ",".join(jars)
session = SparkSession.builder \ session = SparkSession.builder \
.appName(name) \ .appName(name) \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
.config("spark.jars", jars) \ .config("spark.jars", jars) \
.config("spark.jars.packages", "graphframes:graphframes:0.8.3-spark3.4-s_2.12") \
.config("spark.executor.extraClassPath", jars) \ .config("spark.executor.extraClassPath", jars) \
.config("spark.driver.extraClassPath", jars) \ .config("spark.driver.extraClassPath", jars) \
.config("spark.mongodb.input.uri", mongodb_uri) \ .config("spark.starrocks.driver", "com.starroks.jdbc.Driver") \
.config("spark.mongodb.output.uri", mongodb_uri) \ .config("spark.sql.catalogImplementation", "in-memory") \
.getOrCreate() .getOrCreate()
# .config("spark.starrocks.url", starrok_uri) \
# .config("spark.starrocks.driver", "com.starroks.jdbc.Driver") \
# .config("spark.sql.catalogImplementation", "in-memory") \
# .getOrCreate()
session._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") session._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
session._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://192.168.21.47:9000")
session._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
session._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
session._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minioadmin")
session._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minioadmin")
except Exception as e: except Exception as e:
logger.error(f"Error creando sesion. {e}") logger.error(f"Error creando sesion. {e}")
finally: finally:
return session return session
def get_goal_by_kpi(df: DataFrame, agent: str, period: str, kpi: str) -> float: def find_related_vertices(graph):
result = 0.0 # Obtener vértices y aristas del grafo
try: vertices = graph.vertices
df = df.filter((df["CEDULA"] == agent) & (df["PERIODO_PROCESO_CODIGO"] == period) & (df["KPI"] == kpi)). \ edges = graph.edges
select("META_FINAL")
if df.count() != 0:
results = [row[0] for row in df.select("META_FINAL").collect()]
result = results[0]
except Exception as e:
logger.error(f"Error obteniendo meta por kpi. {e}")
finally:
return result
# Diccionario para almacenar los vértices relacionados para cada vértice
related_vertices_dict = {}
def get_execute_by_service(df: DataFrame, agent: str, period: str, segment: str) -> int: # Función de búsqueda en profundidad (DFS)
result = 0 def dfs(vertex_id, related_vertices):
try: # Agregar el vértice actual a la lista de relacionados
df = df.filter((df["AGENTE_COMISIONA"] == agent) & (df["PERIODO_PROCESO_CODIGO"] == period) & related_vertices.add(vertex_id)
(df["SEGMENTO"] == segment))
result = df.count() # Encontrar vértices relacionados directamente al vértice actual
except Exception as e: direct_related = edges.filter(edges.src == vertex_id).select("dst").collect()
logger.error(f"Error obteniendo meta por segmento. {e}")
finally: # Explorar cada vértice relacionado directamente
return result for row in direct_related:
related_vertex_id = row["dst"]
# Si el vértice relacionado no ha sido visitado, realizar DFS en él
if related_vertex_id not in related_vertices:
dfs(related_vertex_id, related_vertices)
# Obtener los valores únicos de los vértices
unique_vertices = vertices.select("id").distinct().collect()
# Iterar sobre los vértices únicos
for i, row in enumerate(unique_vertices):
vertex_id = row["id"]
# Inicializar un conjunto para almacenar vértices relacionados
related_vertices = set()
# Realizar DFS para encontrar todas las relaciones del vértice actual
dfs(vertex_id, related_vertices)
# Agregar los vértices relacionados al diccionario
related_vertices_dict[vertex_id] = list(related_vertices)
related_vertices_dict[vertex_id].remove(vertex_id)
return related_vertices_dict
...@@ -7,13 +7,12 @@ from prefect import flow, get_run_logger ...@@ -7,13 +7,12 @@ from prefect import flow, get_run_logger
from Pipeline.CommissionProcess import CommissionProcess from Pipeline.CommissionProcess import CommissionProcess
SPARK_JARS = { SPARK_JARS = {
"MONGO_CORE": "/opt/spark-jars/mongodb-driver-core-4.0.4.jar", "STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
"MONGO_CLIENT": "/opt/spark-jars/mongodb-driver-sync-4.0.4.jar", "MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar"
"MONGODB": "/opt/spark-jars/mongo-spark-connector_2.12-3.0.1.jar",
"BSON": "/opt/spark-jars/bson-4.0.4.jar"
} }
MONGODB_URI = "mongodb://bcom_spark_user:root@192.168.1.37:50001/bcom_spark" STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030"
@flow() @flow()
...@@ -25,20 +24,20 @@ def run_commission(config: Dict[str, Any]) -> None: ...@@ -25,20 +24,20 @@ def run_commission(config: Dict[str, Any]) -> None:
# Conexion a Spark (LocalMode, StandAlone or Clúster) # Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time() start_init = time.time()
commission_process.init(SPARK_JARS, MONGODB_URI) commission_process.init(SPARK_JARS)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}") logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - Extraer la data - RECORDAR: SPARK ES LAZY!!! # Primer task - Extraer la data - RECORDAR: SPARK ES LAZY!!!
start_reader = time.time() start_reader = time.time()
commission_process.get_inputs(commission_process) commission_process.get_inputs(commission_process, STARROK_JDBC, STARROK_FE_NODE)
logger.info(f"Duración de extracción de datos desde la BD: {time.time() - start_reader}") logger.info(f"Duración de extracción de datos desde la BD: {time.time() - start_reader}")
# Tercer task - Obtener metas # Tercer task - Obtener metas
start_process = time.time() start_process = time.time()
goals = commission_process.get_goals_second_way(commission_process, "VENTAS", "GOALS") goals = commission_process.get_goals(commission_process, "VENTAS", "GOALS")
# Quinto task - Obtener ejecutados - ¿Aplicar tmb filtro de FLAG_COMISIONABLE y ACTIVE_USER_TRAFFIC? # Quinto task - Obtener ejecutados - ¿Aplicar tmb filtro de FLAG_COMISIONABLE y ACTIVE_USER_TRAFFIC?
executes = commission_process.get_executed_second_way(commission_process, "VENTAS", "TEAMS") executes = commission_process.get_executed(commission_process, "VENTAS", "TEAMS")
# Sexo task - Obtener monto origen # Sexo task - Obtener monto origen
base = commission_process.get_source_value(commission_process, "VENTAS", "COMERCIAL_BASE") base = commission_process.get_source_value(commission_process, "VENTAS", "COMERCIAL_BASE")
...@@ -48,10 +47,10 @@ def run_commission(config: Dict[str, Any]) -> None: ...@@ -48,10 +47,10 @@ def run_commission(config: Dict[str, Any]) -> None:
# Task de escritura # Task de escritura
start_load = time.time() start_load = time.time()
_ = commission_process.write_result(commission_process, result, "REPORT_SUMMARY") _ = commission_process.write_result(commission_process, result, "REPORT_SUMMARY", STARROK_JDBC, STARROK_FE_NODE)
logger.info(f"Duración de carga del reporte a la BD: {time.time() - start_load}") logger.info(f"Duración de carga del reporte a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso de etl: {time.time() - start_time}") logger.info(f"Duración de ejecución del proceso de comision: {time.time() - start_time}")
if __name__ == "__main__": if __name__ == "__main__":
......
import time
import json
from typing import Any, Dict
from prefect import flow, get_run_logger
from Pipeline.CommissionProcess import CommissionProcess
SPARK_JARS = {
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
"MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar"
}
STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030"
@flow()
def run_commission(config: Dict[str, Any]) -> None:
logger = get_run_logger()
start_time = time.time()
commission_process = CommissionProcess(config)
# Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time()
commission_process.init(SPARK_JARS)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - Extraer la data - RECORDAR: SPARK ES LAZY!!!
start_reader = time.time()
commission_process.get_inputs(commission_process, STARROK_JDBC, STARROK_FE_NODE)
logger.info(f"Duración de extracción de datos desde la BD: {time.time() - start_reader}")
# Tercer task - Obtener metas
start_process = time.time()
goals = commission_process.get_goals_2(commission_process, "GOALS", "ESTRUCTURA_ORGANIZACIONAL")
# Quinto task - Obtener ejecutados - ¿Aplicar tmb filtro de FLAG_COMISIONABLE y ACTIVE_USER_TRAFFIC?
executes = commission_process.get_executed_2(commission_process, "ESTRUCTURA_ORGANIZACIONAL", "TEAMS", "VENTAS")
#
# Sexo task - Obtener monto origen
base = commission_process.get_source_value_2(commission_process, "ESTRUCTURA_ORGANIZACIONAL", "COMERCIAL_BASE")
# Segundo task - Crear jerarquía
start_process = time.time()
# ["AGENTES", "ESTRUCTURA", "UO", "OGRANIZACIONES"]
identifiers = ["INDIVIDUOS", "ESTRUCTURA_ORGANIZACIONAL", "UNIDAD", "ORGANIZACION"]
jerarquia_graph = commission_process.create_jerarquia(commission_process, identifiers, goals, executes, base)
logger.info(f"Duración de creación de dataframes con grafos (jerarquía): {time.time() - start_process}")
result = commission_process.update_executes(commission_process, jerarquia_graph, goals, executes, base)
result = commission_process.get_commission_per_agent_2(commission_process, result)
logger.info(f"Duración de procesamiento en memoria: {time.time() - start_process}")
# Task de escritura
start_load = time.time()
_ = commission_process.write_result(commission_process, result, "REPORT_SUMMARY", STARROK_JDBC, STARROK_FE_NODE)
logger.info(f"Duración de carga del reporte a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso de comision: {time.time() - start_time}")
if __name__ == "__main__":
conf_path = "config.json"
with open(conf_path) as f:
conf = json.load(f)
# Run Commission
run_commission(conf)
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
"data": [ "data": [
{ {
"identifier": "VENTAS", "identifier": "VENTAS",
"path": "s3a://prueba-id/bcom-tests/inputs/gross_202311.txt", "path": "s3a://prueba-id/inputs_spark/gross_202311.txt",
"input_type": "txt", "input_type": "txt",
"separator": "|", "separator": "|",
"schema": { "schema": {
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
}, },
{ {
"identifier": "TEAMS", "identifier": "TEAMS",
"path": "s3a://prueba-id/bcom-tests/inputs/equipos_202311.txt", "path": "s3a://prueba-id/inputs_spark/equipos_202311.txt",
"input_type": "txt", "input_type": "txt",
"separator": "|", "separator": "|",
"schema": { "schema": {
...@@ -45,7 +45,7 @@ ...@@ -45,7 +45,7 @@
}, },
{ {
"identifier": "GOALS", "identifier": "GOALS",
"path": "s3a://prueba-id/bcom-tests/inputs/metas_202311.csv", "path": "s3a://prueba-id/inputs_spark/metas_202311.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -58,7 +58,7 @@ ...@@ -58,7 +58,7 @@
}, },
{ {
"identifier": "COMERCIAL_BASE", "identifier": "COMERCIAL_BASE",
"path": "s3a://prueba-id/bcom-tests/inputs/planta_comercial_202311.csv", "path": "s3a://prueba-id/inputs_spark/planta_comercial_202311.csv",
"input_type": "csv", "input_type": "csv",
"separator": ";", "separator": ";",
"schema": { "schema": {
...@@ -67,14 +67,92 @@ ...@@ -67,14 +67,92 @@
"ESTADO": "TEXT", "ESTADO": "TEXT",
"VARIABLE_COMISION": "DECIMAL" "VARIABLE_COMISION": "DECIMAL"
} }
},
{
"identifier": "INDIVIDUOS",
"path": "s3a://prueba-id/inputs_spark/individuos_2023111813.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PIIN_NAMES": "TEXT",
"PIIN_LASTN": "TEXT",
"PIIN_IDENT": "TEXT",
"PIIN_TDOCU": "TEXT",
"PIIN_SLIQU": "TEXT",
"PIIN_CURRE": "TEXT",
"PIIN_BASAL": "DECIMAL",
"PIIN_CPERS": "TEXT",
"PIIN_CPHON": "TEXT",
"PIIN_CEMAI": "TEXT",
"UBIG_IDENT": "TEXT"
}
},
{
"identifier": "ROLES",
"path": "s3a://prueba-id/inputs_spark/roles_2023111812.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PIRO_IDENT": "TEXT",
"PIRO_NAME": "TEXT"
}
},
{
"identifier": "ORGANIZACION",
"path": "s3a://prueba-id/inputs_spark/organizaciones_2023111813.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PIOR_ORGID": "TEXT",
"PIOR_NAME": "TEXT",
"PIOR_IDENT": "TEXT",
"PIOR_SLIQU": "TEXT",
"PIOR_TCHAN": "TEXT",
"PIOR_CCHAN": "TEXT",
"PIOR_CPERS": "TEXT",
"PIOR_CPHON": "TEXT",
"PIOR_CEMAI": "TEXT",
"PIOR_RESPO": "TEXT",
"PIOR_REPRE": "TEXT",
"UBIG_IDENT": "TEXT",
"PIOR_LIQIN": "TEXT"
}
},
{
"identifier": "UNIDAD",
"path": "s3a://prueba-id/inputs_spark/unidades_organizacionales_2023111812.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PIOU_ORGID": "TEXT",
"PIOU_NAME": "TEXT",
"PIOU_UOTYP": "TEXT",
"PIOU_BEORG": "TEXT",
"PIOU_CPERS": "TEXT",
"PIOU_CPHON": "TEXT",
"PIOU_CEMAI": "TEXT",
"PIOU_RESPO": "TEXT",
"PIOU_SEGME": "TEXT",
"UBIG_IDENT": "TEXT"
}
},
{
"identifier": "ESTRUCTURA_ORGANIZACIONAL",
"path": "s3a://prueba-id/inputs_spark/estructura_organizacional_2023111812.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PIOS_ORGID": "TEXT",
"PIOS_INDID": "TEXT",
"PIOS_ROLID": "TEXT",
"PIOS_SUPER": "TEXT"
}
} }
] ]
}, },
"output": { "starroks": {
"type": "bucket", "user": "root",
"params": { "password": ""
"provider": "aws",
"bucket": "prueba-id"
}
} }
} }
\ No newline at end of file
{
"identifier": "BCOM-SPARK-TESTS2",
"inputs": {
"type": "bucket",
"params": {
"provider": "aws"
},
"data": [
{
"identifier": "FACTURACION",
"path": "s3a://prueba-id/bcom-tests/inputs/Facturacion_20240320.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"NOMBRE_CLIENTE": "TEXT",
"NUM_FACTURA": "TEXT",
"DOCUMENTO": "TEXT",
"CIUDAD": "TEXT",
"FECHA_VENCIMIENTO": "TEXT",
"SUBS_ID": "TEXT",
"ESTADO_FACTURA": "TEXT"
}
},
{
"identifier": "ENDING",
"path": "s3a://prueba-id/bcom-tests/inputs/Ending_20240320.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"SUBSCRIBER_ID": "TEXT",
"SERVICIO": "TEXT",
"ESTADO": "TEXT",
"MOVIMIENTO_NOMBRE": "TEXT",
"OPERADOR_PORTA_DESTINO": "TEXT",
"REVENUE": "DECIMAL"
}
}
]
},
"starroks": {
"user": "root",
"password": ""
}
}
\ No newline at end of file
...@@ -11,17 +11,12 @@ SPARK_JARS = { ...@@ -11,17 +11,12 @@ SPARK_JARS = {
"BUNDLE": "/opt/spark-jars/aws-java-sdk-bundle-1.12.431.jar", "BUNDLE": "/opt/spark-jars/aws-java-sdk-bundle-1.12.431.jar",
"COMMON": "/opt/spark-jars/hadoop-common-3.3.4.jar", "COMMON": "/opt/spark-jars/hadoop-common-3.3.4.jar",
"AWS_CLIENT": "/opt/spark-jars/hadoop-client-3.3.4.jar", "AWS_CLIENT": "/opt/spark-jars/hadoop-client-3.3.4.jar",
"MONGO_CORE": "/opt/spark-jars/mongodb-driver-core-4.0.4.jar", "STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
"MONGO_CLIENT": "/opt/spark-jars/mongodb-driver-sync-4.0.4.jar",
"MONGODB": "/opt/spark-jars/mongo-spark-connector_2.12-3.0.1.jar",
"BSON": "/opt/spark-jars/bson-4.0.4.jar",
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.4_2.12-1.1.2.jar",
"MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar" "MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar"
} }
MONGODB_URI = "mongodb://bcom_spark_user:root@192.168.1.37:50001/bcom_spark" STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030"
STARROK_URI = "jdbc:starroks://root:@ec2-3-237-32-62.compute-1.amazonaws.com:9030/bcom_spark"
@flow @flow
...@@ -34,7 +29,7 @@ def run_etl(config: Dict[str, Any]) -> None: ...@@ -34,7 +29,7 @@ def run_etl(config: Dict[str, Any]) -> None:
# Conexion a Spark (LocalMode, StandAlone or Clúster) # Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time() start_init = time.time()
etl_process.init(SPARK_JARS, starrok_uri=STARROK_URI) etl_process.init(SPARK_JARS)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}") logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - (Reader) - Extraer los ficheros # Primer task - (Reader) - Extraer los ficheros
...@@ -55,13 +50,23 @@ def run_etl(config: Dict[str, Any]) -> None: ...@@ -55,13 +50,23 @@ def run_etl(config: Dict[str, Any]) -> None:
# Write - Insumo GROSS # Write - Insumo GROSS
start_load = time.time() start_load = time.time()
etl_process.write.submit(etl_process, "VENTAS", ventas_flag) etl_process.write.submit(etl_process, "VENTAS", STARROK_JDBC, STARROK_FE_NODE, ventas_flag)
# Write - Insumo TEAMS # Write - Insumo TEAMS
etl_process.write.submit(etl_process, "TEAMS", teams_flag) etl_process.write.submit(etl_process, "TEAMS", STARROK_JDBC, STARROK_FE_NODE, teams_flag)
# Write - Insumo GOALS # Write - Insumo GOALS
etl_process.write.submit(etl_process, "GOALS") etl_process.write.submit(etl_process, "GOALS", STARROK_JDBC, STARROK_FE_NODE)
# Write - Insumo PLANTA # Write - Insumo PLANTA
etl_process.write.submit(etl_process, "COMERCIAL_BASE") etl_process.write.submit(etl_process, "COMERCIAL_BASE", STARROK_JDBC, STARROK_FE_NODE)
# Write - Insumo INDIVIDUOS
etl_process.write.submit(etl_process, "INDIVIDUOS", STARROK_JDBC, STARROK_FE_NODE)
# Write - Insumo ROLES
etl_process.write.submit(etl_process, "ROLES", STARROK_JDBC, STARROK_FE_NODE)
# Write - Insumo ORGANIZACION
etl_process.write.submit(etl_process, "ORGANIZACION", STARROK_JDBC, STARROK_FE_NODE)
# Write - Insumo UNIDADES
etl_process.write.submit(etl_process, "UNIDAD", STARROK_JDBC, STARROK_FE_NODE)
# Write - Insumo ESTRUCTURA
etl_process.write.submit(etl_process, "ESTRUCTURA_ORGANIZACIONAL", STARROK_JDBC, STARROK_FE_NODE)
logger.info(f"Duración de carga de datos a la BD: {time.time() - start_load}") logger.info(f"Duración de carga de datos a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso ETL General: {time.time() - start_time}") logger.info(f"Duración de ejecución del proceso ETL General: {time.time() - start_time}")
......
import time
import json
from typing import Any, Dict
from prefect import flow, get_run_logger
from Pipeline.ETLProcess import ETLProcess
SPARK_JARS = {
"AWS_CORE": "/opt/spark-jars/hadoop-aws-3.3.4.jar",
"BUNDLE": "/opt/spark-jars/aws-java-sdk-bundle-1.12.431.jar",
"COMMON": "/opt/spark-jars/hadoop-common-3.3.4.jar",
"AWS_CLIENT": "/opt/spark-jars/hadoop-client-3.3.4.jar",
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
"MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar"
}
STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030"
@flow
def run_etl(config: Dict[str, Any]) -> None:
logger = get_run_logger()
start_time = time.time()
etl_process = ETLProcess(config)
# Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time()
etl_process.init(SPARK_JARS)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - (Reader) - Extraer los ficheros
start_reader = time.time()
etl_process.reader(etl_process)
logger.info(f"Duración de extracción de ficheros desde S3: {time.time() - start_reader}")
# Segundo task - Setear esquema a las tablas
start_transform = time.time()
etl_process.set_schema(etl_process)
# Process - Insumo Facturacion
teams_fact = etl_process.process_facturacion(etl_process, "FACTURACION")
logger.info(f"Duración de transformación y limpieza de datos: {time.time() - start_transform}")
start_load = time.time()
# Write - Insumo TEAMS
etl_process.write(etl_process, "FACTURACION", STARROK_JDBC, STARROK_FE_NODE, teams_fact)
# Write - Insumo GOALS
etl_process.write(etl_process, "ENDING", STARROK_JDBC, STARROK_FE_NODE)
logger.info(f"Duración de carga de datos a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso ETL General: {time.time() - start_time}")
if __name__ == "__main__":
conf_path = "config2.json"
with open(conf_path) as f:
conf = json.load(f)
# Run ETL
run_etl(conf)
aiosqlite==0.20.0
alembic==1.13.1
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.4
asgi-lifespan==2.1.0
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
coolname==2.2.0
croniter==2.0.3
cryptography==42.0.5
dateparser==1.2.0
dnspython==2.6.1
docker==6.1.3
email_validator==2.1.1
exceptiongroup==1.2.0
fsspec==2024.3.1
google-auth==2.28.2
graphviz==0.20.2
greenlet==3.0.3
griffe==0.42.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.4
httpx==0.27.0
hyperframe==6.0.1
idna==3.6
importlib_resources==6.1.3
itsdangerous==2.1.2
Jinja2==3.1.3
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
kubernetes==29.0.0
Mako==1.3.2
Markdown==3.6
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
oauthlib==3.2.2
orjson==3.9.15
packaging==24.0
pathspec==0.12.1
pendulum==2.1.2
prefect==2.16.4
py4j==0.10.9.7
pyasn1==0.5.1
pyasn1-modules==0.3.0
pycparser==2.21
pydantic==2.6.4
pydantic_core==2.16.3
Pygments==2.17.2
pyspark==3.4.0
python-dateutil==2.9.0.post0
python-multipart==0.0.9
python-slugify==8.0.4
pytz==2024.1
pytzdata==2020.1
PyYAML==6.0.1
readchar==4.0.6
referencing==0.34.0
regex==2023.12.25
requests==2.31.0
requests-oauthlib==1.4.0
rfc3339-validator==0.1.4
rich==13.7.1
rpds-py==0.18.0
rsa==4.9
ruamel.yaml==0.18.6
ruamel.yaml.clib==0.2.8
six==1.16.0
sniffio==1.3.1
SQLAlchemy==2.0.28
text-unidecode==1.3
toml==0.10.2
typer==0.9.0
typing_extensions==4.10.0
tzlocal==5.2
ujson==5.9.0
urllib3==2.2.1
uvicorn==0.28.1
websocket-client==1.7.0
websockets==12.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment