Commit 53b1c503 authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer_ca' into 'developer'

Update Bcom Spark Components

See merge request !1
parents 5fca2ea8 6c63987b
from enum import Enum
class CloudProviderEnum(Enum):
AWS = "aws"
GOOGLE = "gcp"
from enum import Enum
from pyspark.sql.types import StringType, IntegerType, DecimalType, DateType
class DataTypeEnum(Enum):
INTEGER = IntegerType
TEXT = StringType
DECIMAL = DecimalType
DATE = DateType
from enum import Enum
class DatabaseTypeEnum(Enum):
MONGODB = "mongodb"
MYSQL = "mysql"
REDSHIFT = "redshift"
STARROKS = "starroks"
from enum import Enum
class FileTypeEnum(Enum):
CSV = "csv"
EXCEL = "excel"
TXT = "txt"
PARQUET = "parquet"
from enum import Enum
class InputTypeEnum(Enum):
LOCAL = "local"
DATABASE = "db"
BUCKET = "bucket"
from typing import Dict, Any, List
import logging
from pyspark.sql import SparkSession
from Enum.FileTypeEnum import FileTypeEnum
logger = logging.getLogger()
class BucketAwsInput:
def __init__(self, name: str, session: SparkSession, params: Dict[str, Any]) -> None:
self.name = name
self.session = session
self.input_path = params["path"]
self.input_type = params["type"]
self.separator = params["separator"]
self.schema = params["schema"]
self.data = None
def get_data(self, replace: bool, replace_space_str: str) -> None:
try:
def replace_delimiters(line):
line = line.replace(replace_space_str, " ")
return line
file_type = FileTypeEnum(self.input_type)
if not self.input_path.startswith("s3://") and not self.input_path.startswith("s3a://"):
raise Exception(f"Error getting descriptor from S3. Path should start with s3://")
final_path = self.input_path
if file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TXT:
if replace:
lines_rdd = self.session.sparkContext.textFile(final_path)
cleaned = lines_rdd.map(replace_delimiters)
self.data = self.session.read.csv(cleaned, header=True, sep=self.separator)
else:
self.data = self.session.read.csv(final_path, header=True, sep=self.separator)
elif file_type == FileTypeEnum.PARQUET:
self.data = self.session.read.parquet(final_path, header=True)
else:
logger.info(f"Formato de archivo no soportado: {self.input_type}")
except Exception as e:
logger.error(f"Error obteniendo data de insumo. método 'get_data'. {e}")
from typing import Any, Dict
from pyspark.sql import SparkSession
from Enum.InputTypeEnum import InputTypeEnum
from Enum.CloudProviderEnum import CloudProviderEnum
from Input.BucketAwsSource import BucketAwsInput
class Input:
def __init__(self, input_type: str, session: SparkSession, params: Dict[str, Any], provider=None) -> None:
self.input_type = input_type
if not provider:
provider = CloudProviderEnum.AWS.value
if input_type == InputTypeEnum.BUCKET.value and provider == CloudProviderEnum.AWS.value:
self.factory = BucketAwsInput(params["identifier"], session, params)
self.data = None
def get_data(self, replace: bool = False, replace_space_str: str = "\t") -> None:
self.factory.get_data(replace, replace_space_str)
self.data = self.factory.data
import logging
from typing import Dict, Any, List
from prefect import task
import time
from pyspark.sql import DataFrame
from pyspark.sql.functions import when, lit, sum as py_sum, count, coalesce, expr
from graphframes import GraphFrame
from Utils.SparkUtils import createSession, find_related_vertices
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
logger = logging.getLogger()
class CommissionProcess:
def __init__(self, config: Dict[str, Any]) -> None:
self.conf = config
self.identifier = self.conf["identifier"]
self.session = None
self.period = self.conf["period"]
self.inputs = {}
self.inter_results = {}
def init(self, spark_jars: Dict[str, str]) -> None:
self.session = createSession(self.identifier, spark_jars, "")
@task
def get_inputs(self, db_type: DatabaseTypeEnum, starroks_jdbc: str, starroks_fe: str, redshift_url: str = "",
mysql_url: str = "") -> None:
try:
inputs = self.conf["inputs"]
database = starroks_jdbc[starroks_jdbc.rfind("/") + 1:]
for input_obj in inputs["data"]:
identifier = input_obj["identifier"]
if db_type == DatabaseTypeEnum.REDSHIFT:
redshift_user = self.conf["redshift"]["user"]
redshift_pass = self.conf["redshift"]["password"]
df_input = self.session.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", redshift_url) \
.option("dbtable", identifier) \
.option("user", redshift_user) \
.option("password", redshift_pass) \
.load()
elif db_type == DatabaseTypeEnum.MYSQL:
mysql_user = self.conf["mysql"]["user"]
mysql_pass = self.conf["mysql"]["password"]
df_input = self.session.read \
.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", mysql_url) \
.option("dbtable", identifier) \
.option("user", mysql_user) \
.option("password", mysql_pass) \
.load()
else:
starroks_user = self.conf["starroks"]["user"]
starroks_pass = self.conf["starroks"]["password"]
df_input = self.session.read.format("starrocks"). \
option("starrocks.table.identifier", database+"."+identifier). \
option("starrocks.fe.http.url", starroks_fe). \
option("starrocks.fe.jdbc.url", starroks_jdbc). \
option("starrocks.user", starroks_user). \
option("starrocks.password", starroks_pass).load()
self.inputs.update({identifier: df_input})
except Exception as e:
raise AssertionError(f"Error in function 'get_inputs'. {e}")
@task
def filter_data(self, identifier: str) -> None:
try:
this_df = self.inputs[identifier]
this_df = this_df.filter(this_df["SERVICIO"] == "Postpaid")
this_df = this_df.filter(this_df["FLAG_COMISIONABLE"] == "1")
this_df = this_df.filter(this_df["ACTIVE_USER_TRAFFIC"] == "1")
self.inputs[identifier] = this_df
except Exception as e:
raise AssertionError(f"Error filtrando datos. {e}")
@task
def create_jerarquia(self, identifiers: List[str], df_goals: DataFrame, df_executes: DataFrame, df_base: DataFrame):
graph = None
try:
merged = df_goals.join(df_executes, ["AGENTE_COMISIONA"], 'left')
merged = merged.join(df_base, ["AGENTE_COMISIONA"], 'left')
agent_df = self.inputs[identifiers[0]]
structure_df = self.inputs[identifiers[1]]
uo_df = self.inputs[identifiers[2]]
org_df = self.inputs[identifiers[3]]
agent_1 = agent_df.join(structure_df, agent_df["PIIN_IDENT"] == structure_df["PIOS_INDID"], 'leftsemi')
agent_1 = agent_1.withColumnRenamed("PIIN_IDENT", "id")
agent_1 = agent_1.join(merged, agent_1["id"] == merged["AGENTE_COMISIONA"], "left")
estructura_1 = structure_df.join(uo_df.select("PIOU_ORGID", "PIOU_RESPO"),
structure_df["PIOS_ORGID"] == uo_df["PIOU_ORGID"], 'left')
estructura_1 = estructura_1.withColumn("PIOS_SUPER",
coalesce(estructura_1["PIOS_SUPER"], estructura_1["PIOU_RESPO"]))
estructura_2 = estructura_1.join(org_df.select("PIOR_ORGID", "PIOR_RESPO"),
estructura_1["PIOS_ORGID"] == org_df["PIOR_ORGID"], 'left')
estructura_2 = estructura_2.withColumn("PIOS_SUPER",
coalesce(estructura_2["PIOS_SUPER"], estructura_2["PIOR_RESPO"]))
estructura_2 = estructura_2.drop(*["PIOU_ORGID", "PIOU_RESPO", "PIOR_ORGID", "PIOR_RESPO"])
estructura_2 = estructura_2.withColumn("PIOS_RELATION", lit("responsable"))
estructura_2 = estructura_2.select("PIOS_SUPER", "PIOS_INDID", "PIOS_RELATION")
estructura_2 = estructura_2.withColumnRenamed("PIOS_SUPER", "src").withColumnRenamed("PIOS_INDID", "dst")
graph = GraphFrame(agent_1, estructura_2)
except Exception as e:
logger.error(f"Error creando dataframe de jerarquia. {e}")
finally:
return graph
@task
def get_goals(self, gross_name: str, goals_name: str) -> DataFrame:
result = None
try:
goal_df = self.inputs[goals_name]
gross_df = self.inputs[gross_name]
goal_df = goal_df.withColumn("META_1", when(goal_df["KPI"] == "EQUIPOS", goal_df["META_FINAL"]).otherwise(lit(0)))
goal_df = goal_df.withColumn("META_2", when(goal_df["KPI"] == "GROSS B2B", goal_df["META_FINAL"]).otherwise(lit(0)))
goal_df = goal_df.withColumn("META_3", when(goal_df["KPI"] == "GROSS POSPAGO", goal_df["META_FINAL"]).otherwise(lit(0)))
sums = [py_sum(column).alias(column) for column in ["META_1", "META_2", "META_3"]]
goal_group = goal_df.groupBy("CEDULA", "PERIODO_PROCESO_CODIGO").agg(*sums)
gross_df_agent = gross_df.groupBy("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO").count(). \
select("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO")
goal_group = goal_group.withColumnRenamed("PERIODO_PROCESO_CODIGO", "PERIODO_PROCESO_CODIGO_2")
condition = [gross_df_agent["AGENTE_COMISIONA"] == goal_group["CEDULA"],
gross_df_agent["PERIODO_PROCESO_CODIGO"] == goal_group["PERIODO_PROCESO_CODIGO_2"]]
result = gross_df_agent.join(goal_group, condition, how='left')
result = result.na.fill(value=0, subset=["META_1", "META_2", "META_3"])
result = result.drop(*["CEDULA", "PERIODO_PROCESO_CODIGO_2"])
except Exception as e:
raise AssertionError(f"Error obteniendo metas. Segunda forma. {e}")
finally:
return result
@task
def get_goals_2(self, goals_name: str, structure_name: str) -> DataFrame:
result = None
try:
goal_df = self.inputs[goals_name]
structure_df = self.inputs[structure_name]
goal_df = goal_df[goal_df["PERIODO_PROCESO_CODIGO"] == self.period]
goal_df = goal_df.withColumn("META_1", when(goal_df["KPI"] == "EQUIPOS", goal_df["META_FINAL"]).otherwise(lit(0)))
goal_df = goal_df.withColumn("META_2", when(goal_df["KPI"] == "GROSS B2B", goal_df["META_FINAL"]).otherwise(lit(0)))
goal_df = goal_df.withColumn("META_3", when(goal_df["KPI"] == "GROSS POSPAGO", goal_df["META_FINAL"]).otherwise(lit(0)))
sums = [py_sum(column).alias(column) for column in ["META_1", "META_2", "META_3"]]
goal_group = goal_df.groupBy("CEDULA").agg(*sums)
condition = [structure_df["PIOS_INDID"] == goal_group["CEDULA"]]
result = structure_df.select("PIOS_INDID").join(goal_group, condition, how='left')
result = result.na.fill(value=0, subset=["META_1", "META_2", "META_3"])
result = result.drop(*["CEDULA"])
result = result.withColumn("PERIODO_PROCESO_CODIGO", lit(self.period))
result = result.withColumnRenamed("PIOS_INDID", "AGENTE_COMISIONA")
except Exception as e:
raise AssertionError(f"Error obteniendo metas. Segunda forma. {e}")
finally:
return result
@task
def get_executed(self, gross_name: str, team_name: str) -> DataFrame:
result = None
try:
team_df = self.inputs[team_name]
gross_df = self.inputs[gross_name]
executed1 = team_df[team_df["MODELO_TIPO"] == "SMARTPHONE"]
executed1 = executed1.groupBy("CONSULTOR_DOCUMENTO", "PERIODO_PROCESO_CODIGO").agg(count("MODELO_TIPO").
alias("EJECUTADO_1"))
gross_df_agent = gross_df.groupBy("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO").count(). \
select("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO")
executed1 = executed1.withColumnRenamed("PERIODO_PROCESO_CODIGO", "PERIODO_PROCESO_CODIGO_2")
condition = [gross_df_agent["AGENTE_COMISIONA"] == executed1["CONSULTOR_DOCUMENTO"],
gross_df_agent["PERIODO_PROCESO_CODIGO"] == executed1["PERIODO_PROCESO_CODIGO_2"]]
result_1 = gross_df_agent.join(executed1, condition, how='left')
result_1 = result_1.na.fill(value=0, subset=["EJECUTADO_1"])
result_1 = result_1.drop(*["CONSULTOR_DOCUMENTO", "PERIODO_PROCESO_CODIGO_2"])
gross_b2b = gross_df[(gross_df["SEGMENTO"] == "B2B") & (gross_df["SERVICIO"] == "Postpaid") &
(gross_df["FLAG_COMISIONABLE"] == "1") & (gross_df["ACTIVE_USER_TRAFFIC"] == "1")]
gross_b2b = gross_b2b.groupBy("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO").agg(count("SERVICIO").
alias("EJECUTADO_2"))
gross_b2b = gross_b2b.withColumnRenamed("PERIODO_PROCESO_CODIGO", "PERIODO_PROCESO_CODIGO_2")
gross_b2b = gross_b2b.withColumnRenamed("AGENTE_COMISIONA", "AGENTE_COMISIONA_2")
condition = [gross_df_agent["AGENTE_COMISIONA"] == gross_b2b["AGENTE_COMISIONA_2"],
gross_df_agent["PERIODO_PROCESO_CODIGO"] == gross_b2b["PERIODO_PROCESO_CODIGO_2"]]
result_2 = gross_df_agent.join(gross_b2b, condition, how='left')
result_2 = result_2.na.fill(value=0, subset=["EJECUTADO_2"])
result_2 = result_2.drop(*["AGENTE_COMISIONA_2", "PERIODO_PROCESO_CODIGO_2"])
gross_b2c = gross_df[(gross_df["SEGMENTO"] == "B2C") & (gross_df["SERVICIO"] == "Postpaid") &
(gross_df["FLAG_COMISIONABLE"] == "1") & (gross_df["ACTIVE_USER_TRAFFIC"] == "1")]
gross_b2c = gross_b2c.groupBy("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO").agg(count("SERVICIO").
alias("EJECUTADO_3"))
gross_b2c = gross_b2c.withColumnRenamed("PERIODO_PROCESO_CODIGO", "PERIODO_PROCESO_CODIGO_2")
gross_b2c = gross_b2c.withColumnRenamed("AGENTE_COMISIONA", "AGENTE_COMISIONA_2")
condition = [gross_df_agent["AGENTE_COMISIONA"] == gross_b2c["AGENTE_COMISIONA_2"],
gross_df_agent["PERIODO_PROCESO_CODIGO"] == gross_b2c["PERIODO_PROCESO_CODIGO_2"]]
result_3 = gross_df_agent.join(gross_b2c, condition, how='left')
result_3 = result_3.na.fill(value=0, subset=["EJECUTADO_3"])
result_3 = result_3.drop(*["AGENTE_COMISIONA_2", "PERIODO_PROCESO_CODIGO_2"])
result = result_1.join(result_2, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"], 'left')
result = result.join(result_3, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"])
except Exception as e:
logger.error(f"Error obteniendo ejecutados. Segunda forma. {e}")
finally:
return result
@task
def get_executed_2(self, structure_name: str, team_name: str, gross_name: str) -> DataFrame:
result = None
try:
team_df = self.inputs[team_name]
structure_df = self.inputs[structure_name]
gross_df = self.inputs[gross_name]
team_df = team_df[team_df["PERIODO_PROCESO_CODIGO"] == self.period]
gross_df = gross_df[gross_df["PERIODO_PROCESO_CODIGO"] == self.period]
executed1 = team_df[team_df["MODELO_TIPO"] == "SMARTPHONE"]
executed1 = executed1.groupBy("CONSULTOR_DOCUMENTO").agg(count("MODELO_TIPO").alias("EJECUTADO_1"))
condition = [structure_df["PIOS_INDID"] == executed1["CONSULTOR_DOCUMENTO"]]
result_1 = structure_df.select("PIOS_INDID").join(executed1, condition, how='left')
result_1 = result_1.na.fill(value=0, subset=["EJECUTADO_1"])
result_1 = result_1.drop(*["CONSULTOR_DOCUMENTO"])
gross_b2b = gross_df[(gross_df["SEGMENTO"] == "B2B") & (gross_df["SERVICIO"] == "Postpaid") &
(gross_df["FLAG_COMISIONABLE"] == "1") & (gross_df["ACTIVE_USER_TRAFFIC"] == "1")]
gross_b2b = gross_b2b.groupBy("AGENTE_COMISIONA").agg(count("SERVICIO").alias("EJECUTADO_2"))
condition = [structure_df["PIOS_INDID"] == gross_b2b["AGENTE_COMISIONA"]]
result_2 = structure_df.select("PIOS_INDID").join(gross_b2b, condition, how='left')
result_2 = result_2.na.fill(value=0, subset=["EJECUTADO_2"])
result_2 = result_2.drop(*["AGENTE_COMISIONA"])
gross_b2c = gross_df[(gross_df["SEGMENTO"] == "B2C") & (gross_df["SERVICIO"] == "Postpaid") &
(gross_df["FLAG_COMISIONABLE"] == "1") & (gross_df["ACTIVE_USER_TRAFFIC"] == "1")]
gross_b2c = gross_b2c.groupBy("AGENTE_COMISIONA").agg(count("SERVICIO").alias("EJECUTADO_3"))
# CONVERTIDO A SPARK SQL:
# gross_df.createOrReplaceTempView("gross_df")
#
# # Escribir la consulta SQL
# query = """
# SELECT AGENTE_COMISIONA, COUNT(SERVICIO) AS EJECUTADO_3
# FROM gross_df
# WHERE SEGMENTO = 'B2C' AND SERVICIO = 'Postpaid' AND FLAG_COMISIONABLE = '1' AND ACTIVE_USER_TRAFFIC = '1'
# GROUP BY AGENTE_COMISIONA
# """
#
# # Ejecutar la consulta SQL
# gross_b2c = spark.sql(query)
condition = [structure_df["PIOS_INDID"] == gross_b2c["AGENTE_COMISIONA"]]
result_3 = structure_df.select("PIOS_INDID").join(gross_b2c, condition, how='left')
result_3 = result_3.na.fill(value=0, subset=["EJECUTADO_3"])
result_3 = result_3.drop(*["AGENTE_COMISIONA"])
result = result_1.join(result_2, ["PIOS_INDID"], 'left')
result = result.join(result_3, ["PIOS_INDID"])
result = result.withColumnRenamed("PIOS_INDID", "AGENTE_COMISIONA")
except Exception as e:
logger.error(f"Error obteniendo ejecutados. Segunda forma. {e}")
finally:
return result
@task
def update_executes(self, graph: GraphFrame, df_goals: DataFrame, df_executes: DataFrame, df_base: DataFrame):
merged = None
try:
start_process = time.time()
graph_related = find_related_vertices(graph)
merged = df_goals.join(df_executes, "AGENTE_COMISIONA", 'left')
merged = merged.join(df_base, "AGENTE_COMISIONA", 'left')
graph_related_df = self.session.createDataFrame([(k, v) for k, v in graph_related.items()], ["vertex", "related_agents"])
# Agregar los valores ejecutados por cada agente
agg_df = merged.join(graph_related_df, expr("array_contains(related_agents, AGENTE_COMISIONA)"), "inner") \
.groupBy("vertex") \
.agg(py_sum("EJECUTADO_1").alias("sum_EJECUTADO_1"),
py_sum("EJECUTADO_2").alias("sum_EJECUTADO_2"),
py_sum("EJECUTADO_3").alias("sum_EJECUTADO_3"))
merged = merged.join(agg_df, merged["AGENTE_COMISIONA"] == agg_df["vertex"], 'left')
merged = merged.withColumn("EJECUTADO_1", when(merged["vertex"].isNotNull(), merged["sum_EJECUTADO_1"]).otherwise(merged["EJECUTADO_1"]))
merged = merged.withColumn("EJECUTADO_2", when(merged["vertex"].isNotNull(), merged["sum_EJECUTADO_2"]).otherwise(merged["EJECUTADO_2"]))
merged = merged.withColumn("EJECUTADO_3", when(merged["vertex"].isNotNull(), merged["sum_EJECUTADO_3"]).otherwise(merged["EJECUTADO_3"]))
merged = merged.drop("vertex", "sum_EJECUTADO_1", "sum_EJECUTADO_2", "sum_EJECUTADO_3")
print(f"Duración de creación de dataframes con grafos (jerarquía): {time.time() - start_process}")
except Exception as e:
logger.error(f"Error actualizando ejecutados por jerarquía. {e}")
finally:
return merged
@task
def get_source_value(self, gross_name: str, base_name: str) -> DataFrame:
result = None
try:
base_df = self.inputs[base_name]
gross_df = self.inputs[gross_name]
base_df = base_df.select("PERIODO_PROCESO_CODIGO", "CEDULA", "VARIABLE_COMISION")
base_df = base_df.dropDuplicates()
gross_df_agent = gross_df.groupBy("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO").count(). \
select("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO")
base_df = base_df.withColumnRenamed("PERIODO_PROCESO_CODIGO", "PERIODO_PROCESO_CODIGO_2")
condition = [gross_df_agent["AGENTE_COMISIONA"] == base_df["CEDULA"],
gross_df_agent["PERIODO_PROCESO_CODIGO"] == base_df["PERIODO_PROCESO_CODIGO_2"]]
result = gross_df_agent.join(base_df, condition, how='left')
result = result.na.fill(value=0, subset=["VARIABLE_COMISION"])
result = result.drop(*["CEDULA", "PERIODO_PROCESO_CODIGO_2"])
except Exception as e:
logger.error(f"Error obteniendo valor de monto de origen. {e}")
finally:
return result
@task
def get_source_value_2(self, structure_name: str, base_name: str) -> DataFrame:
result = None
try:
base_df = self.inputs[base_name]
structure_df = self.inputs[structure_name]
base_df = base_df[base_df["PERIODO_PROCESO_CODIGO"] == self.period]
base_df = base_df.select("CEDULA", "VARIABLE_COMISION")
base_df = base_df.dropDuplicates()
condition = [structure_df["PIOS_INDID"] == base_df["CEDULA"]]
result = structure_df.select("PIOS_INDID").join(base_df, condition, how='left')
result = result.na.fill(value=0, subset=["VARIABLE_COMISION"])
result = result.drop(*["CEDULA"])
# Convertido a SQL
# base_df.createOrReplaceTempView("base_df")
# structure_df.createOrReplaceTempView("structure_df")
#
# query = """
# SELECT DISTINCT b.CEDULA, b.VARIABLE_COMISION
# FROM base_df b
# JOIN structure_df s ON s.PIOS_INDID = b.CEDULA
# WHERE b.PERIODO_PROCESO_CODIGO = '{}'
# """.format(self.period)
#
# result = spark.sql(query)
# result = result.na.fill(value=0, subset=["VARIABLE_COMISION"])
# result = result.drop("CEDULA")
result = result.withColumnRenamed("PIOS_INDID", "AGENTE_COMISIONA")
except Exception as e:
logger.error(f"Error obteniendo valor de monto de origen. {e}")
finally:
return result
@task
def get_commission_per_agent(self, df_goals: DataFrame, df_executes: DataFrame, df_base: DataFrame) -> DataFrame:
try:
merged = df_goals.join(df_executes, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"], 'left')
merged = merged.join(df_base, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"], 'left')
merged = merged.withColumn("CUMPLIMIENTO EQUIPOS",
when(merged["META_1"] != 0, merged["EJECUTADO_1"]/merged["META_1"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO EQUIPOS", merged["CUMPLIMIENTO EQUIPOS"] * 0.25)
merged = merged.withColumn("CUMPLIMIENTO B2B",
when(merged["META_2"] != 0, merged["EJECUTADO_2"]/merged["META_2"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO B2B", merged["CUMPLIMIENTO B2B"] * 0.1)
merged = merged.withColumn("CUMPLIMIENTO B2C",
when(merged["META_3"] != 0, merged["EJECUTADO_3"]/merged["META_3"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO B2C", merged["CUMPLIMIENTO B2C"] * 0.65)
merged = merged.withColumn("PONDERADO TOTAL", merged["PONDERADO EQUIPOS"] + merged["PONDERADO B2B"] + merged["PONDERADO B2C"])
merged = merged.withColumn("FACTOR", when(merged["PONDERADO TOTAL"] <= 0.79, lit(0)).
when((merged["PONDERADO TOTAL"] >= 0.8) & (merged["PONDERADO TOTAL"] < 2), merged["PONDERADO TOTAL"]).
when(merged["PONDERADO TOTAL"] >= 2, lit(2)))
merged = merged.withColumn("COMISION", merged["FACTOR"]*merged["VARIABLE_COMISION"])
merged = merged.withColumnRenamed("EJECUTADO_2", "EJECUTADO B2B")
merged = merged.withColumnRenamed("EJECUTADO_3", "EJECUTADO B2C")
merged = merged.withColumnRenamed("EJECUTADO_1", "EJECUTADO Cant Smart")
merged = merged.withColumnRenamed("META_1", "META EQUIPOS")
merged = merged.withColumnRenamed("META_2", "META GROSS B2B")
merged = merged.withColumnRenamed("META_3", "META GROSS POSPAGO")
merged = merged.withColumnRenamed("VARIABLE_COMISION", "VARIABLE COMISION (monto de origen)")
except Exception as e:
logger.error(f"Error operando la comisión. {e}")
finally:
return
@task
def get_commission_per_agent_2(self, merged: DataFrame) -> DataFrame:
try:
merged = merged.withColumn("CUMPLIMIENTO EQUIPOS",
when(merged["META_1"] != 0, merged["EJECUTADO_1"]/merged["META_1"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO EQUIPOS", merged["CUMPLIMIENTO EQUIPOS"] * 0.25)
merged = merged.withColumn("CUMPLIMIENTO B2B",
when(merged["META_2"] != 0, merged["EJECUTADO_2"]/merged["META_2"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO B2B", merged["CUMPLIMIENTO B2B"] * 0.1)
merged = merged.withColumn("CUMPLIMIENTO B2C",
when(merged["META_3"] != 0, merged["EJECUTADO_3"]/merged["META_3"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO B2C", merged["CUMPLIMIENTO B2C"] * 0.65)
merged = merged.withColumn("PONDERADO TOTAL", merged["PONDERADO EQUIPOS"] + merged["PONDERADO B2B"] + merged["PONDERADO B2C"])
merged = merged.withColumn("FACTOR", when(merged["PONDERADO TOTAL"] <= 0.79, lit(0)).
when((merged["PONDERADO TOTAL"] >= 0.8) & (merged["PONDERADO TOTAL"] < 2), merged["PONDERADO TOTAL"]).
when(merged["PONDERADO TOTAL"] >= 2, lit(2)))
merged = merged.withColumn("COMISION", merged["FACTOR"]*merged["VARIABLE_COMISION"])
merged = merged.withColumnRenamed("EJECUTADO_2", "EJECUTADO B2B")
merged = merged.withColumnRenamed("EJECUTADO_3", "EJECUTADO B2C")
merged = merged.withColumnRenamed("EJECUTADO_1", "EJECUTADO Cant Smart")
merged = merged.withColumnRenamed("META_1", "META EQUIPOS")
merged = merged.withColumnRenamed("META_2", "META GROSS B2B")
merged = merged.withColumnRenamed("META_3", "META GROSS POSPAGO")
merged = merged.withColumnRenamed("VARIABLE_COMISION", "VARIABLE COMISION (monto de origen)")
except Exception as e:
logger.error(f"Error operando la comisión. {e}")
finally:
return merged
@task
def write_result(self, df: DataFrame, table_name: str, db_type: DatabaseTypeEnum, starroks_jdbc: str, starroks_fe: str,
redshift_url: str = "", mysql_url: str = "") -> None:
try:
if db_type == DatabaseTypeEnum.REDSHIFT:
df.write \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", redshift_url) \
.option("dbtable", table_name) \
.option("user", "awsuser") \
.option("password", "Awsuser123") \
.mode("append") \
.save()
elif db_type == DatabaseTypeEnum.MYSQL:
mysql_user = self.conf["mysql"]["user"]
mysql_pass = self.conf["mysql"]["password"]
df.write \
.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", mysql_url) \
.option("dbtable", table_name) \
.option("user", mysql_user) \
.option("password", mysql_pass) \
.mode("overwrite") \
.save()
else:
database = starroks_jdbc[starroks_jdbc.rfind("/") + 1:]
starroks_user = self.conf["starroks"]["user"]
starroks_pass = self.conf["starroks"]["password"]
df.write.format("starrocks") \
.option("starrocks.fe.http.url", starroks_fe) \
.option("starrocks.fe.jdbc.url", starroks_jdbc) \
.option("starrocks.table.identifier", database+"."+table_name) \
.option("starrocks.user", starroks_user) \
.option("starrocks.password", starroks_pass) \
.mode("append") \
.save()
except Exception as e:
logger.error(f"Error guardando datos en BD. {e}")
from typing import Dict, Any
import logging
from pyspark.sql.functions import col, when, lit, to_date, date_format, date_add
from prefect import task
from Enum.DataTypeEnum import DataTypeEnum
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
from Enum.InputTypeEnum import InputTypeEnum
from Utils.SparkUtils import createSession
from Input.Source import Input
logger = logging.getLogger()
class ETLProcess:
def __init__(self, config: Dict[str, Any]) -> None:
self.conf = config
self.identifier = self.conf["identifier"]
self.session = None
self.inputs = {}
def init(self, spark_jars: Dict[str, str], source_type: InputTypeEnum = InputTypeEnum.BUCKET) -> None:
self.session = createSession(self.identifier, spark_jars, source_type)
@task
def reader(self) -> None:
try:
inputs = self.conf["inputs"]
input_type = inputs["type"]
provider = inputs["params"]["provider"] if "provider" in inputs["params"].keys() else None
for input_obj in inputs["data"]:
identifier = input_obj["identifier"]
params = {"identifier": identifier, "path": input_obj["path"], "type": input_obj["input_type"],
"separator": input_obj["separator"], "schema": input_obj["schema"]}
current_input = Input(input_type, self.session, params, provider)
# Caso especial de reemplazar "\t" con " "
if identifier == "FACTURACION":
current_input.get_data(True)
else:
current_input.get_data()
self.inputs.update({identifier: current_input.data})
except Exception as e:
raise AssertionError(f"Error in function extrayendo data. Reader. {e}")
@task
def set_schema(self) -> None:
try:
inputs = self.conf["inputs"]
for input_obj in inputs["data"]:
identifier = input_obj["identifier"]
schema = input_obj["schema"]
input_schema = self.create_schema(schema)
columns, schema = input_schema["columns"], input_schema["schema"]
self.inputs[identifier] = self.inputs[identifier].select(*columns)
for column, datatype in schema:
self.inputs[identifier] = self.inputs[identifier].withColumn(column, col(column).cast(datatype))
except Exception as e:
raise AssertionError(f"Error procesando información. Process. {e}")
def create_schema(self, schema: Dict[str, str]) -> Dict[str, Any]:
response = {}
try:
columns = list(schema.keys())
structure = []
for column in columns:
field = (column, DataTypeEnum[schema[column]].value())
structure.append(field)
response.update({"columns": columns, "schema": structure})
except Exception as e:
logger.error(f"Error leyendo esquema para el insumo {self.identifier}. {e}")
finally:
return response
@task
def process_gross(self, identifier: str) -> bool:
success = False
try:
self.inputs[identifier] = self.inputs[identifier].withColumn("AGENTE_COMISIONA", col("CONSULTOR_NK"))
self.inputs[identifier] = self.inputs[identifier].withColumn('SEGMENTO',
when((col('PLAN_NOMBRE').contains('Inter')) & (col('CLIENTE_NATURALEZA') == 'Persona Juridica'), 'B2B')
.when(col('PLAN_NOMBRE').contains('Neg'), 'B2B')
.when(col('SERVICIO').contains('Prepaid'), 'PREP').otherwise('B2C'))
self.inputs[identifier] = self.inputs[identifier].withColumn("TIPO_CANAL", lit("DIRECT"))
success = True
except Exception as e:
logger.error(f"Error transformando archivo gross. {e}")
finally:
return success
@task
def process_teams(self, identifier: str) -> bool:
success = False
try:
self.inputs[identifier] = self.inputs[identifier].withColumn('SEGMENTO',
when(col('CLIENTE_NATURALEZA') == 'Persona Juridica', 'B2B')
.otherwise('B2C'))
self.inputs[identifier] = self.inputs[identifier].withColumn("TIPO_CANAL", lit("DIRECT"))
success = True
except Exception as e:
raise AssertionError(f"Error transformando archivo equipo. {e}")
finally:
return success
@task
def process_facturacion(self, identifier: str) -> bool:
success = False
try:
df = self.inputs[identifier]
df = df.withColumn("fecha_vencimiento_fact", to_date(df["FECHA_VENCIMIENTO"], "dd/MM/yy"))
df = df.withColumn("fecha_periodo_fact",
to_date(date_format(col("PERIODO_PROCESO_CODIGO"), "yyyyMM") + "01", "yyyyMMdd"))
df = df.withColumn("FACTURA_VENCIDA",
when(date_add(col("fecha_periodo_fact"), 5) < col("fecha_vencimiento_fact"), lit("SI"))
.otherwise(lit("NO")))
self.inputs[identifier] = df
success = True
except Exception as e:
logger.error(f"Error transformando archivo de facturacion. {e}")
finally:
return success
@task
def write(self, identifier: str, starroks_jdbc: str, starroks_fe: str, prev_status: bool = True,
db_type: DatabaseTypeEnum = DatabaseTypeEnum.REDSHIFT, redshift_url: str = "", mysql_url: str = "") -> None:
try:
if db_type == DatabaseTypeEnum.REDSHIFT:
self.inputs[identifier].coalesce(45).write \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", redshift_url) \
.option("dbtable", identifier) \
.option("user", "awsuser") \
.option("password", "Awsuser123") \
.mode("append") \
.save()
elif db_type == DatabaseTypeEnum.MYSQL:
mysql_user = self.conf["mysql"]["user"]
mysql_pass = self.conf["mysql"]["password"]
self.inputs[identifier].write \
.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", mysql_url) \
.option("dbtable", identifier) \
.option("user", mysql_user) \
.option("password", mysql_pass) \
.mode("append") \
.save()
else:
database = starroks_jdbc[starroks_jdbc.rfind("/")+1:]
starroks_user = self.conf["starroks"]["user"]
starroks_pass = self.conf["starroks"]["password"]
self.inputs[identifier].write.format("starrocks") \
.option("starrocks.fe.http.url", starroks_fe) \
.option("starrocks.fe.jdbc.url", starroks_jdbc) \
.option("starrocks.table.identifier", database+"."+identifier) \
.option("starrocks.user", starroks_user) \
.option("starrocks.password", starroks_pass) \
.mode("append") \
.save()
except Exception as e:
logger.error(f"Error guardando resultados. {e}")
# BCOM-Components-Innovation-Tests # BCOM-Components-Innovation-Tests
BCOM-Components-Innovation-Tests BCOM-Components-Innovation-Tests
Pruebas de Bcom sobre tecnologías (Spark, ) Pruebas de Bcom sobre tecnologías (Python 3.10, Spark 3.4.0, Prefect 2.16.4)
\ No newline at end of file
Scripts de ejecución:
1.- etl.py: Extracción y guardado de datos de 9 archivos (descritos en el archivo config.json)
2.- etl_2.py: Extracción y guardado de datos de archivos grandes - millones de datos (descritos en el archivo config2.json)
3.- commission_2.py: Ejecución de lógica de comisión (con jerarquía) y uso de grafos.
Ejecución:
1. Crear un ambiente con python 3.10, activarlo e instalar todas las librerías del archivo requirements.txt
2. Validar los archivos de configuración (insumos, credenciales) y las constantes de scripts.
3. Ejecutar, por ejemplo: python etl.py
En el caso de AWS EMR:
1. Colocar todo el código (proyecto) dentro de la instancia master
2. Validar todos los jars e insumos en el bucket correspondiente (recordar que EMR usa S3
como filesystem distribuido). Recordar que en AWS, no se necesitan los jars de AWS S3, porque los tiene
por default.
3. Ejecutar el comando (varía de acuerdo a los requisitos de driver y executor):
```shell
spark-submit \
--jars s3://bucket-emr-example/bcom_spark/jars/hadoop-lzo-0.4.3.jar,s3://bucket-emr-example/bcom_spark/jars/mysql-connector-java-8.0.30.jar \
--conf spark.driver.extraClassPath=s3://bucket-emr-example/bcom_spark/jars/hadoop-lzo-0.4.3.jar,s3://bucket-emr-example/bcom_spark/jars/mysql-connector-java-8.0.30.jar \
--conf spark.executor.extraClassPath=s3://bucket-emr-example/bcom_spark/jars/hadoop-lzo-0.4.3.jar,s3://bucket-emr-example/bcom_spark/jars/mysql-connector-java-8.0.30.jar \
etl.py --master yarn --deploy-mode cluster
```
from typing import Dict
from pyspark.sql import SparkSession
import logging
from Enum.InputTypeEnum import InputTypeEnum
logger = logging.getLogger()
def createSession(name: str, spark_jars: Dict[str, str], source_type: InputTypeEnum) -> SparkSession:
session = None
try:
jars = list(spark_jars.values())
jars = ",".join(jars)
session = SparkSession.builder \
.appName(name) \
.config("spark.jars", jars) \
.config("spark.jars.packages", "graphframes:graphframes:0.8.3-spark3.4-s_2.12") \
.config("spark.executor.extraClassPath", jars) \
.config("spark.driver.extraClassPath", jars) \
.config("spark.starrocks.driver", "com.starroks.jdbc.Driver") \
.getOrCreate()
session._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
if source_type == InputTypeEnum.LOCAL:
session._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://192.168.21.47:9000")
session._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
session._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
session._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minioadmin")
session._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minioadmin")
else:
session.conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
except Exception as e:
logger.error(f"Error creando sesion. {e}")
finally:
return session
def find_related_vertices(graph):
# Obtener vértices y aristas del grafo
vertices = graph.vertices
edges = graph.edges
# Diccionario para almacenar los vértices relacionados para cada vértice
related_vertices_dict = {}
# Función de búsqueda en profundidad (DFS)
def dfs(vertex_id, related_vertices):
# Agregar el vértice actual a la lista de relacionados
related_vertices.add(vertex_id)
# Encontrar vértices relacionados directamente al vértice actual
direct_related = edges.filter(edges.src == vertex_id).select("dst").collect()
# Explorar cada vértice relacionado directamente
for row in direct_related:
related_vertex_id = row["dst"]
# Si el vértice relacionado no ha sido visitado, realizar DFS en él
if related_vertex_id not in related_vertices:
dfs(related_vertex_id, related_vertices)
# Obtener los valores únicos de los vértices
unique_vertices = vertices.select("id").distinct().collect()
# Iterar sobre los vértices únicos
for i, row in enumerate(unique_vertices):
vertex_id = row["id"]
# Inicializar un conjunto para almacenar vértices relacionados
related_vertices = set()
# Realizar DFS para encontrar todas las relaciones del vértice actual
dfs(vertex_id, related_vertices)
# Agregar los vértices relacionados al diccionario
related_vertices_dict[vertex_id] = list(related_vertices)
return related_vertices_dict
import time
import json
from typing import Any, Dict
from prefect import flow, get_run_logger
from Pipeline.CommissionProcess import CommissionProcess
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
SPARK_JARS = {
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
"MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar"
}
STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030"
REDSHIFT_JDBC = "jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=prueba_ca"
DB_TYPE = DatabaseTypeEnum.REDSHIFT
@flow()
def run_commission(config: Dict[str, Any]) -> None:
logger = get_run_logger()
start_time = time.time()
commission_process = CommissionProcess(config)
# Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time()
commission_process.init(SPARK_JARS)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - Extraer la data - RECORDAR: SPARK ES LAZY!!!
start_reader = time.time()
commission_process.get_inputs(commission_process, DB_TYPE, STARROK_JDBC, STARROK_FE_NODE, REDSHIFT_JDBC)
logger.info(f"Duración de extracción de datos desde la BD: {time.time() - start_reader}")
# Tercer task - Obtener metas
start_process = time.time()
goals = commission_process.get_goals(commission_process, "VENTAS", "GOALS")
# Quinto task - Obtener ejecutados - ¿Aplicar tmb filtro de FLAG_COMISIONABLE y ACTIVE_USER_TRAFFIC?
executes = commission_process.get_executed(commission_process, "VENTAS", "DEVICES")
# Sexo task - Obtener monto origen
base = commission_process.get_source_value(commission_process, "VENTAS", "COMERCIAL_BASE")
result = commission_process.get_commission_per_agent(commission_process, goals, executes, base)
logger.info(f"Duración de procesamiento en memoria: {time.time() - start_process}")
# Task de escritura
start_load = time.time()
_ = commission_process.write_result(commission_process, result, "REPORT_SUMMARY", STARROK_JDBC, STARROK_FE_NODE)
logger.info(f"Duración de carga del reporte a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso de comision: {time.time() - start_time}")
if __name__ == "__main__":
conf_path = "config.json"
with open(conf_path) as f:
conf = json.load(f)
# Run Commission
run_commission(conf)
import time
import json
from typing import Any, Dict
from prefect import flow, get_run_logger
from Pipeline.CommissionProcess import CommissionProcess
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
SPARK_JARS = {
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
"MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar"
}
STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030"
REDSHIFT_JDBC = "jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=prueba_ca"
MYSQL_JDBC = "jdbc:mysql://localhost:13306/bcom_spark"
DB_TYPE = DatabaseTypeEnum.STARROKS
@flow()
def run_commission(config: Dict[str, Any]) -> None:
logger = get_run_logger()
start_time = time.time()
commission_process = CommissionProcess(config)
# Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time()
commission_process.init(SPARK_JARS)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - Extraer la data - RECORDAR: SPARK ES LAZY!!!
start_reader = time.time()
commission_process.get_inputs(commission_process, DB_TYPE, STARROK_JDBC, STARROK_FE_NODE, REDSHIFT_JDBC,
MYSQL_JDBC)
logger.info(f"Duración de extracción de datos desde la BD: {time.time() - start_reader}")
# Tercer task - Obtener metas
start_process = time.time()
goals = commission_process.get_goals_2(commission_process, "GOALS", "ESTRUCTURA_ORGANIZACIONAL")
# Quinto task - Obtener ejecutados - ¿Aplicar tmb filtro de FLAG_COMISIONABLE y ACTIVE_USER_TRAFFIC?
executes = commission_process.get_executed_2(commission_process, "ESTRUCTURA_ORGANIZACIONAL", "DEVICES", "VENTAS")
# Sexo task - Obtener monto origen
base = commission_process.get_source_value_2(commission_process, "ESTRUCTURA_ORGANIZACIONAL", "COMERCIAL_BASE")
# Segundo task - Crear jerarquía
# ["AGENTES", "ESTRUCTURA", "UO", "OGRANIZACIONES"]
identifiers = ["INDIVIDUOS", "ESTRUCTURA_ORGANIZACIONAL", "UNIDAD", "ORGANIZACION"]
jerarquia_graph = commission_process.create_jerarquia(commission_process, identifiers, goals, executes, base)
result = commission_process.update_executes(commission_process, jerarquia_graph, goals, executes, base)
result = commission_process.get_commission_per_agent_2(commission_process, result)
logger.info(f"Duración de procesamiento en memoria: {time.time() - start_process}")
# Task de escritura
start_load = time.time()
_ = commission_process.write_result(commission_process, result, "REPORT_SUMMARY", DB_TYPE, STARROK_JDBC,
STARROK_FE_NODE, REDSHIFT_JDBC, MYSQL_JDBC)
logger.info(f"Duración de carga del reporte a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso de comision: {time.time() - start_time}")
if __name__ == "__main__":
conf_path = "config.json"
with open(conf_path) as f:
conf = json.load(f)
# Run Commission
run_commission(conf)
{
"identifier": "BCOM-SPARK-TESTS",
"period": "202311",
"inputs": {
"type": "bucket",
"params": {
"provider": "aws"
},
"data": [
{
"identifier": "VENTAS",
"path": "s3a://prueba-id2/bcom-tests/inputs/gross_202311.txt",
"input_type": "txt",
"separator": "|",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"SUBSCRIPTOR_ID": "TEXT",
"MOVIMIENTO_TIPO": "TEXT",
"FLAG_COMISIONABLE": "TEXT",
"CONSULTOR_NK": "TEXT",
"CLIENTE_ID": "TEXT",
"CLIENTE_NOMBRE": "TEXT",
"CLIENTE_NATURALEZA": "TEXT",
"SERVICIO": "TEXT",
"REVENUE": "DECIMAL",
"PLAN_CODIGIO_NK": "TEXT",
"PLAN_NOMBRE": "TEXT",
"ACTIVE_USER_TRAFFIC": "TEXT"
}
},
{
"identifier": "DEVICES",
"path": "s3a://prueba-id2/bcom-tests/inputs/equipos_202311.txt",
"input_type": "txt",
"separator": "|",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"MODELO_TIPO": "TEXT",
"SUBSCRIBER_ID": "TEXT",
"CONSULTOR_DOCUMENTO": "TEXT",
"CLIENTE_CODIGO_NK": "TEXT",
"SERVICIO": "TEXT",
"PRECIO_VENTA": "DECIMAL"
}
},
{
"identifier": "GOALS",
"path": "s3a://prueba-id2/bcom-tests/inputs/metas_202311.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"CEDULA": "TEXT",
"KPI": "TEXT",
"META_INCIAL": "DECIMAL",
"META_FINAL": "DECIMAL"
}
},
{
"identifier": "COMERCIAL_BASE",
"path": "s3a://prueba-id2/bcom-tests/inputs/planta_comercial_202311.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"CEDULA": "TEXT",
"ESTADO": "TEXT",
"VARIABLE_COMISION": "DECIMAL"
}
},
{
"identifier": "INDIVIDUOS",
"path": "s3a://prueba-id2/bcom-tests/inputs/individuos_2023111813.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PIIN_NAMES": "TEXT",
"PIIN_LASTN": "TEXT",
"PIIN_IDENT": "TEXT",
"PIIN_TDOCU": "TEXT",
"PIIN_SLIQU": "TEXT",
"PIIN_CURRE": "TEXT",
"PIIN_BASAL": "DECIMAL",
"PIIN_CPERS": "TEXT",
"PIIN_CPHON": "TEXT",
"PIIN_CEMAI": "TEXT",
"UBIG_IDENT": "TEXT"
}
},
{
"identifier": "ROLES",
"path": "s3a://prueba-id2/bcom-tests/inputs/roles_2023111812.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PIRO_IDENT": "TEXT",
"PIRO_NAME": "TEXT"
}
},
{
"identifier": "ORGANIZACION",
"path": "s3a://prueba-id2/bcom-tests/inputs/organizaciones_2023111813.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PIOR_ORGID": "TEXT",
"PIOR_NAME": "TEXT",
"PIOR_IDENT": "TEXT",
"PIOR_SLIQU": "TEXT",
"PIOR_TCHAN": "TEXT",
"PIOR_CCHAN": "TEXT",
"PIOR_CPERS": "TEXT",
"PIOR_CPHON": "TEXT",
"PIOR_CEMAI": "TEXT",
"PIOR_RESPO": "TEXT",
"PIOR_REPRE": "TEXT",
"UBIG_IDENT": "TEXT",
"PIOR_LIQIN": "TEXT"
}
},
{
"identifier": "UNIDAD",
"path": "s3a://prueba-id2/bcom-tests/inputs/unidades_organizacionales_2023111812.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PIOU_ORGID": "TEXT",
"PIOU_NAME": "TEXT",
"PIOU_UOTYP": "TEXT",
"PIOU_BEORG": "TEXT",
"PIOU_CPERS": "TEXT",
"PIOU_CPHON": "TEXT",
"PIOU_CEMAI": "TEXT",
"PIOU_RESPO": "TEXT",
"PIOU_SEGME": "TEXT",
"UBIG_IDENT": "TEXT"
}
},
{
"identifier": "ESTRUCTURA_ORGANIZACIONAL",
"path": "s3a://prueba-id2/bcom-tests/inputs/estructura_organizacional_2023111812.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PIOS_ORGID": "TEXT",
"PIOS_INDID": "TEXT",
"PIOS_ROLID": "TEXT",
"PIOS_SUPER": "TEXT"
}
}
]
},
"starroks": {
"user": "root",
"password": ""
},
"redshift": {
"user": "awsuser",
"password": "Awsuser123"
},
"mysql": {
"user": "root",
"password": "root"
}
}
\ No newline at end of file
{
"identifier": "BCOM-SPARK-TESTS2",
"inputs": {
"type": "bucket",
"params": {
"provider": "aws"
},
"data": [
{
"identifier": "ENDING",
"path": "s3a://prueba-id2/bcom-tests/inputs/Ending_20240320.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"SUBSCRIBER_ID": "TEXT",
"SERVICIO": "TEXT",
"ESTADO": "TEXT",
"MOVIMIENTO_NOMBRE": "TEXT",
"OPERADOR_PORTA_DESTINO": "TEXT",
"REVENUE": "DECIMAL"
}
}
]
},
"starroks": {
"user": "root",
"password": ""
},
"redshift": {
"user": "awsuser",
"password": "Awsuser123"
},
"mysql": {
"user": "root",
"password": "root"
}
}
\ No newline at end of file
import time
import json
from typing import Any, Dict
from prefect import flow, get_run_logger
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
from Enum.InputTypeEnum import InputTypeEnum
from Pipeline.ETLProcess import ETLProcess
SPARK_JARS = {
"AWS_CORE": "/opt/spark-jars/hadoop-aws-3.3.4.jar",
"BUNDLE": "/opt/spark-jars/aws-java-sdk-bundle-1.12.431.jar",
"COMMON": "/opt/spark-jars/hadoop-common-3.3.4.jar",
"AWS_CLIENT": "/opt/spark-jars/hadoop-client-3.3.4.jar",
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
"MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar",
"REDSHIFT": "/opt/spark-jars/redshift-jdbc42-2.1.0.12.jar"
}
STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030"
REDSHIFT_JDBC = "jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=prueba_ca"
MYSQL_JDBC = "jdbc:mysql://localhost:13306/bcom_spark"
DB_TYPE = DatabaseTypeEnum.MYSQL
SOURCE_TYPE = InputTypeEnum.BUCKET
@flow
def run_etl(config: Dict[str, Any]) -> None:
logger = get_run_logger()
start_time = time.time()
etl_process = ETLProcess(config)
# Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time()
etl_process.init(SPARK_JARS, SOURCE_TYPE)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - (Reader) - Extraer los ficheros
start_reader = time.time()
etl_process.reader(etl_process)
logger.info(f"Duración de extracción de ficheros desde S3: {time.time() - start_reader}")
# Segundo task - Setear esquema a las tablas
start_transform = time.time()
etl_process.set_schema(etl_process)
# Process - Insumo Gross (Ventas)
ventas_flag = etl_process.process_gross(etl_process, "VENTAS")
# Process - Insumo Team (Equipos)
teams_flag = etl_process.process_teams(etl_process, "DEVICES")
logger.info(f"Duración de transformación y limpieza de datos: {time.time() - start_transform}")
# Write - Insumo GROSS
start_load = time.time()
etl_process.write(etl_process, "VENTAS", STARROK_JDBC, STARROK_FE_NODE, ventas_flag, DB_TYPE,
REDSHIFT_JDBC, MYSQL_JDBC)
# Write - Insumo DEVICES
etl_process.write(etl_process, "DEVICES", STARROK_JDBC, STARROK_FE_NODE, teams_flag, DB_TYPE,
REDSHIFT_JDBC, MYSQL_JDBC)
# Write - Insumo GOALS
etl_process.write(etl_process, "GOALS", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo PLANTA
etl_process.write(etl_process, "COMERCIAL_BASE", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo INDIVIDUOS
etl_process.write(etl_process, "INDIVIDUOS", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo ROLES
etl_process.write(etl_process, "ROLES", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo ORGANIZACION
etl_process.write(etl_process, "ORGANIZACION", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo UNIDADES
etl_process.write(etl_process, "UNIDAD", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
# Write - Insumo ESTRUCTURA
etl_process.write(etl_process, "ESTRUCTURA_ORGANIZACIONAL", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
logger.info(f"Duración de carga de datos a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso ETL General: {time.time() - start_time}")
if __name__ == "__main__":
conf_path = "config.json"
with open(conf_path) as f:
conf = json.load(f)
# Run ETL
run_etl(conf)
import time
import json
from typing import Any, Dict
from prefect import flow, get_run_logger
from Pipeline.ETLProcess import ETLProcess
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
from Enum.InputTypeEnum import InputTypeEnum
SPARK_JARS = {
"AWS_CORE": "/opt/spark-jars/hadoop-aws-3.3.4.jar",
"BUNDLE": "/opt/spark-jars/aws-java-sdk-bundle-1.12.431.jar",
"COMMON": "/opt/spark-jars/hadoop-common-3.3.4.jar",
"AWS_CLIENT": "/opt/spark-jars/hadoop-client-3.3.4.jar",
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.2_2.12-1.1.2.jar",
"MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar",
"REDSHIFT": "/opt/spark-jars/redshift-jdbc42-2.1.0.12.jar"
}
STARROK_JDBC = "jdbc:starrocks://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030"
REDSHIFT_JDBC = ("jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redshift.amazonaws.com:5439/dev"
"?currentSchema=prueba_ca")
MYSQL_JDBC = "jdbc:mysql://localhost:13306/bcom_spark"
DB_TYPE = DatabaseTypeEnum.STARROKS
SOURCE_TYPE = InputTypeEnum.BUCKET
@flow
def run_etl(config: Dict[str, Any]) -> None:
logger = get_run_logger()
start_time = time.time()
etl_process = ETLProcess(config)
# Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time()
etl_process.init(SPARK_JARS, SOURCE_TYPE)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - (Reader) - Extraer los ficheros
start_reader = time.time()
etl_process.reader(etl_process)
logger.info(f"Duración de extracción de ficheros desde S3: {time.time() - start_reader}")
# Segundo task - Setear esquema a las tablas
start_transform = time.time()
# Process - Insumo Facturacion
teams_fact = etl_process.process_facturacion(etl_process, "FACTURACION")
logger.info(f"Duración de transformación y limpieza de datos: {time.time() - start_transform}")
start_load = time.time()
# Write - Insumo DEVICES
etl_process.write(etl_process, "FACTURACION", STARROK_JDBC, STARROK_FE_NODE, teams_fact, DB_TYPE,
REDSHIFT_JDBC, MYSQL_JDBC)
# Write - Insumo GOALS
etl_process.write(etl_process, "ENDING", STARROK_JDBC, STARROK_FE_NODE, db_type=DB_TYPE,
redshift_url=REDSHIFT_JDBC, mysql_url=MYSQL_JDBC)
logger.info(f"Duración de carga de datos a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso ETL General: {time.time() - start_time}")
if __name__ == "__main__":
conf_path = "config2.json"
with open(conf_path) as f:
conf = json.load(f)
# Run ETL
run_etl(conf)
aiosqlite==0.20.0
alembic==1.13.1
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.4
asgi-lifespan==2.1.0
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
bottle==0.12.25
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
coolname==2.2.0
croniter==2.0.3
cryptography==42.0.5
dateparser==1.2.0
dnspython==2.6.1
docker==6.1.3
email_validator==2.1.1
exceptiongroup==1.2.0
fsspec==2024.3.1
google-auth==2.28.2
graphframes==0.6
graphviz==0.20.2
greenlet==3.0.3
griffe==0.42.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.4
httpx==0.27.0
hyperframe==6.0.1
idna==3.6
importlib_resources==6.1.3
itsdangerous==2.1.2
Jinja2==3.1.3
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
kubernetes==29.0.0
Mako==1.3.2
Markdown==3.6
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
nose==1.3.7
numpy==1.26.4
oauthlib==3.2.2
orjson==3.9.15
packaging==24.0
pathspec==0.12.1
pendulum==2.1.2
prefect==2.16.4
py4j==0.10.9.7
pyasn1==0.5.1
pyasn1-modules==0.3.0
pycparser==2.21
pydantic==2.6.4
pydantic_core==2.16.3
Pygments==2.17.2
pyspark==3.4.0
python-dateutil==2.9.0.post0
python-multipart==0.0.9
python-slugify==8.0.4
pytz==2024.1
pytzdata==2020.1
PyYAML==6.0.1
readchar==4.0.6
referencing==0.34.0
regex==2023.12.25
requests==2.31.0
requests-oauthlib==1.4.0
rfc3339-validator==0.1.4
rich==13.7.1
rpds-py==0.18.0
rsa==4.9
ruamel.yaml==0.18.6
ruamel.yaml.clib==0.2.8
six==1.16.0
sniffio==1.3.1
SQLAlchemy==2.0.28
text-unidecode==1.3
toml==0.10.2
typer==0.9.0
typing_extensions==4.10.0
tzlocal==5.2
ujson==5.9.0
urllib3==2.2.1
uvicorn==0.28.1
websocket-client==1.7.0
websockets==12.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment