Commit 9c72a0a6 authored by Cristian Aguirre's avatar Cristian Aguirre

Add starroks.py

parent 15b44e0a
import logging
from typing import Dict, Any
from prefect import flow, task
from pyspark.sql import DataFrame
from pyspark.sql.functions import when, lit, sum as py_sum, col, count
from Utils.SparkUtils import createSession, get_goal_by_kpi, get_execute_by_service
logger = logging.getLogger()
class CommissionProcess:
def __init__(self, config: Dict[str, Any]) -> None:
self.conf = config
self.identifier = self.conf["identifier"]
self.period = self.conf["period"]
self.session = None
self.agents = []
self.periods = []
self.inputs = {}
self.inter_results = {}
def init(self, spark_jars: Dict[str, str], mongodb_uri: str = "") -> None:
self.session = createSession(self.identifier, spark_jars, mongodb_uri)
@task
def get_inputs(self) -> None:
try:
inputs = self.conf["inputs"]
for input_obj in inputs["data"]:
identifier = input_obj["identifier"]
df_input = self.session.read.format("com.mongodb.spark.sql.DefaultSource").\
option("collection", identifier).load()
self.inputs.update({identifier: df_input})
except Exception as e:
raise AssertionError(f"Error in function 'get_inputs'. {e}")
@task
def get_agents_and_period(self, identifier: str) -> None:
try:
# Obtener los agentes
agents_df = self.inputs[identifier].groupBy("AGENTE_COMISIONA").count()
self.agents = [row[0] for row in agents_df.select("AGENTE_COMISIONA").collect() if row[0]]
# Obtener los periodos
period_df = self.inputs[identifier].groupBy("PERIODO_PROCESO_CODIGO").count()
self.periods = [row[0] for row in period_df.select("PERIODO_PROCESO_CODIGO").collect()]
except Exception as e:
raise AssertionError(f"Error obteniendo agentes. {e}")
@task
def filter_data(self, identifier: str) -> None:
try:
this_df = self.inputs[identifier]
this_df = this_df.filter(this_df["SERVICIO"] == "Postpaid")
this_df = this_df.filter(this_df["FLAG_COMISIONABLE"] == "1")
this_df = this_df.filter(this_df["ACTIVE_USER_TRAFFIC"] == "1")
self.inputs[identifier] = this_df
except Exception as e:
raise AssertionError(f"Error filtrando datos. {e}")
@task
def get_goals_second_way(self, gross_name: str, goals_name: str) -> DataFrame:
result = None
try:
goal_df = self.inputs[goals_name]
gross_df = self.inputs[gross_name]
goal_df = goal_df.withColumn("META_1", when(goal_df["KPI"] == "EQUIPOS", goal_df["META_FINAL"]).otherwise(lit(0)))
goal_df = goal_df.withColumn("META_2", when(goal_df["KPI"] == "GROSS B2B", goal_df["META_FINAL"]).otherwise(lit(0)))
goal_df = goal_df.withColumn("META_3", when(goal_df["KPI"] == "GROSS POSPAGO", goal_df["META_FINAL"]).otherwise(lit(0)))
sums = [py_sum(column).alias(column) for column in ["META_1", "META_2", "META_3"]]
goal_group = goal_df.groupBy("CEDULA", "PERIODO_PROCESO_CODIGO").agg(*sums)
gross_df_agent = gross_df.groupBy("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO").count(). \
select("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO")
goal_group = goal_group.withColumnRenamed("PERIODO_PROCESO_CODIGO", "PERIODO_PROCESO_CODIGO_2")
condition = [gross_df_agent["AGENTE_COMISIONA"] == goal_group["CEDULA"],
gross_df_agent["PERIODO_PROCESO_CODIGO"] == goal_group["PERIODO_PROCESO_CODIGO_2"]]
result = gross_df_agent.join(goal_group, condition, how='left')
result = result.na.fill(value=0, subset=["META_1", "META_2", "META_3"])
result = result.drop(*["CEDULA", "PERIODO_PROCESO_CODIGO_2"])
except Exception as e:
raise AssertionError(f"Error obteniendo metas. Segunda forma. {e}")
finally:
return result
@task
def get_executed_second_way(self, gross_name: str, team_name: str) -> DataFrame:
result = None
try:
team_df = self.inputs[team_name]
gross_df = self.inputs[gross_name]
executed1 = team_df[team_df["MODELO_TIPO"] == "SMARTPHONE"]
executed1 = executed1.groupBy("CONSULTOR_DOCUMENTO", "PERIODO_PROCESO_CODIGO").agg(count("MODELO_TIPO").
alias("EJECUTADO_1"))
gross_df_agent = gross_df.groupBy("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO").count(). \
select("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO")
executed1 = executed1.withColumnRenamed("PERIODO_PROCESO_CODIGO", "PERIODO_PROCESO_CODIGO_2")
condition = [gross_df_agent["AGENTE_COMISIONA"] == executed1["CONSULTOR_DOCUMENTO"],
gross_df_agent["PERIODO_PROCESO_CODIGO"] == executed1["PERIODO_PROCESO_CODIGO_2"]]
result_1 = gross_df_agent.join(executed1, condition, how='left')
result_1 = result_1.na.fill(value=0, subset=["EJECUTADO_1"])
result_1 = result_1.drop(*["CONSULTOR_DOCUMENTO", "PERIODO_PROCESO_CODIGO_2"])
gross_b2b = gross_df[(gross_df["SEGMENTO"] == "B2B") & (gross_df["SERVICIO"] == "Postpaid") &
(gross_df["FLAG_COMISIONABLE"] == "1") & (gross_df["ACTIVE_USER_TRAFFIC"] == "1")]
gross_b2b = gross_b2b.groupBy("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO").agg(count("SERVICIO").
alias("EJECUTADO_2"))
gross_b2b = gross_b2b.withColumnRenamed("PERIODO_PROCESO_CODIGO", "PERIODO_PROCESO_CODIGO_2")
gross_b2b = gross_b2b.withColumnRenamed("AGENTE_COMISIONA", "AGENTE_COMISIONA_2")
condition = [gross_df_agent["AGENTE_COMISIONA"] == gross_b2b["AGENTE_COMISIONA_2"],
gross_df_agent["PERIODO_PROCESO_CODIGO"] == gross_b2b["PERIODO_PROCESO_CODIGO_2"]]
result_2 = gross_df_agent.join(gross_b2b, condition, how='left')
result_2 = result_2.na.fill(value=0, subset=["EJECUTADO_2"])
result_2 = result_2.drop(*["AGENTE_COMISIONA_2", "PERIODO_PROCESO_CODIGO_2"])
gross_b2c = gross_df[(gross_df["SEGMENTO"] == "B2C") & (gross_df["SERVICIO"] == "Postpaid") &
(gross_df["FLAG_COMISIONABLE"] == "1") & (gross_df["ACTIVE_USER_TRAFFIC"] == "1")]
gross_b2c = gross_b2c.groupBy("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO").agg(count("SERVICIO").
alias("EJECUTADO_3"))
gross_b2c = gross_b2c.withColumnRenamed("PERIODO_PROCESO_CODIGO", "PERIODO_PROCESO_CODIGO_2")
gross_b2c = gross_b2c.withColumnRenamed("AGENTE_COMISIONA", "AGENTE_COMISIONA_2")
condition = [gross_df_agent["AGENTE_COMISIONA"] == gross_b2c["AGENTE_COMISIONA_2"],
gross_df_agent["PERIODO_PROCESO_CODIGO"] == gross_b2c["PERIODO_PROCESO_CODIGO_2"]]
result_3 = gross_df_agent.join(gross_b2c, condition, how='left')
result_3 = result_3.na.fill(value=0, subset=["EJECUTADO_3"])
result_3 = result_3.drop(*["AGENTE_COMISIONA_2", "PERIODO_PROCESO_CODIGO_2"])
result = result_1.join(result_2, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"], 'left')
result = result.join(result_3, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"])
except Exception as e:
logger.error(f"Error obteniendo ejecutados. Segunda forma. {e}")
finally:
return result
@task
def get_source_value(self, gross_name: str, base_name: str) -> DataFrame:
result = None
try:
base_df = self.inputs[base_name]
gross_df = self.inputs[gross_name]
base_df = base_df.select("PERIODO_PROCESO_CODIGO", "CEDULA", "VARIABLE_COMISION")
base_df = base_df.dropDuplicates()
gross_df_agent = gross_df.groupBy("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO").count(). \
select("AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO")
base_df = base_df.withColumnRenamed("PERIODO_PROCESO_CODIGO", "PERIODO_PROCESO_CODIGO_2")
condition = [gross_df_agent["AGENTE_COMISIONA"] == base_df["CEDULA"],
gross_df_agent["PERIODO_PROCESO_CODIGO"] == base_df["PERIODO_PROCESO_CODIGO_2"]]
result = gross_df_agent.join(base_df, condition, how='left')
result = result.na.fill(value=0, subset=["VARIABLE_COMISION"])
result = result.drop(*["CEDULA", "PERIODO_PROCESO_CODIGO_2"])
except Exception as e:
logger.error(f"Error obteniendo valor de monto de origen. {e}")
finally:
return result
@task
def get_commission_per_agent(self, df_goals: DataFrame, df_executes: DataFrame, df_base: DataFrame) -> DataFrame:
merged = None
try:
merged = df_goals.join(df_executes, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"], 'left')
merged = merged.join(df_base, ["AGENTE_COMISIONA", "PERIODO_PROCESO_CODIGO"], 'left')
merged = merged.withColumn("CUMPLIMIENTO EQUIPOS",
when(merged["META_1"] != 0, merged["EJECUTADO_1"]/merged["META_1"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO EQUIPOS", merged["CUMPLIMIENTO EQUIPOS"] * 0.25)
merged = merged.withColumn("CUMPLIMIENTO B2B",
when(merged["META_2"] != 0, merged["EJECUTADO_2"]/merged["META_2"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO B2B", merged["CUMPLIMIENTO B2B"] * 0.1)
merged = merged.withColumn("CUMPLIMIENTO B2C",
when(merged["META_3"] != 0, merged["EJECUTADO_3"]/merged["META_3"]).
otherwise(lit(0)))
merged = merged.withColumn("PONDERADO B2C", merged["CUMPLIMIENTO B2C"] * 0.65)
merged = merged.withColumn("PONDERADO TOTAL", merged["PONDERADO EQUIPOS"] + merged["PONDERADO B2B"] + merged["PONDERADO B2C"])
merged = merged.withColumn("FACTOR", when(merged["PONDERADO TOTAL"] <= 0.79, lit(0)).
when((merged["PONDERADO TOTAL"] >= 0.8) & (merged["PONDERADO TOTAL"] < 2), merged["PONDERADO TOTAL"]).
when(merged["PONDERADO TOTAL"] >= 2, lit(2)))
merged = merged.withColumn("COMISION", merged["FACTOR"]*merged["VARIABLE_COMISION"])
merged = merged.withColumnRenamed("EJECUTADO_2", "EJECUTADO B2B")
merged = merged.withColumnRenamed("EJECUTADO_3", "EJECUTADO B2C")
merged = merged.withColumnRenamed("EJECUTADO_1", "EJECUTADO Cant Smart")
merged = merged.withColumnRenamed("META_1", "META EQUIPOS")
merged = merged.withColumnRenamed("META_2", "META GROSS B2B")
merged = merged.withColumnRenamed("META_3", "META GROSS POSPAGO")
merged = merged.withColumnRenamed("VARIABLE_COMISION", "VARIABLE COMISION (monto de origen)")
except Exception as e:
logger.error(f"Error operando la comisión. {e}")
finally:
return merged
@task
def write_result(self, df: DataFrame, collection_name: str) -> None:
try:
df.write.format("com.mongodb.spark.sql.DefaultSource").option("collection", collection_name). \
mode("append").save()
except Exception as e:
logger.error(f"Error guardando datos en BD. {e}")
......@@ -19,8 +19,8 @@ class ETLProcess:
self.inputs = {}
def init(self, spark_jars: Dict[str, str], mongodb_uri: str = "") -> None:
self.session = createSession(self.identifier, spark_jars, mongodb_uri)
def init(self, spark_jars: Dict[str, str], mongodb_uri: str = "", starrok_uri: str = "") -> None:
self.session = createSession(self.identifier, spark_jars, mongodb_uri, starrok_uri)
@task
def reader(self) -> None:
......@@ -79,7 +79,7 @@ class ETLProcess:
self.inputs[identifier] = self.inputs[identifier].withColumn("TIPO_CANAL", lit("DIRECT"))
success = True
except Exception as e:
raise AssertionError(f"Error transformando archivo gross. {e}")
logger.error(f"Error transformando archivo gross. {e}")
finally:
return success
......@@ -100,8 +100,15 @@ class ETLProcess:
@task
def write(self, identifier: str, prev_status: bool = True) -> None:
try:
self.inputs[identifier].printSchema()
self.inputs[identifier].write.format("com.mongodb.spark.sql.DefaultSource"). \
option("collection", identifier).mode("append").save()
# self.inputs[identifier].write.format("starrocks"). \
# option("dbtable", identifier).mode("overwrite").save()
self.inputs[identifier].write.format("starrocks") \
.option("starrocks.fe.http.url", "ec2-34-231-243-52.compute-1.amazonaws.com:8030") \
.option("starrocks.fe.jdbc.url", "jdbc:mysql://ec2-34-231-243-52.compute-1.amazonaws.com:9030/bcom_spark") \
.option("starrocks.table.identifier", "bcom_spark."+identifier) \
.option("starrocks.user", "root") \
.option("starrocks.password", "") \
.mode("append") \
.save()
except Exception as e:
logger.error(f"Erro guardando resultados. {e}")
import logging
from typing import Dict, Any
from pyspark.sql import SparkSession, DataFrame
from prefect import flow, task
from Input.Source import Input
logger = logging.getLogger()
class Process:
def __init__(self, config: Dict[str, Any]) -> None:
self.conf = config
self.identifier = self.conf["identifier"]
self.session = None
self.inputs = {}
def init(self) -> None:
self._createSession()
def get_inputs(self) -> None:
try:
pass
except Exception as e:
raise AssertionError(f"Error in function 'get_inputs'. {e}")
def run(self) -> None:
# Get inputs
self.get_inputs()
from typing import Dict
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, DataFrame
import logging
logger = logging.getLogger()
def createSession(name: str, spark_jars: Dict[str, str], mongodb_uri: str = "") -> SparkSession:
def createSession(name: str, spark_jars: Dict[str, str], mongodb_uri: str, starrok_uri: str) -> SparkSession:
session = None
try:
jars = list(spark_jars.values())
jars = ",".join(jars)
print(jars)
session = SparkSession.builder \
.appName(name) \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
......@@ -20,10 +19,41 @@ def createSession(name: str, spark_jars: Dict[str, str], mongodb_uri: str = "")
.config("spark.jars", jars) \
.config("spark.executor.extraClassPath", jars) \
.config("spark.driver.extraClassPath", jars) \
.config("spark.mongodb.input.uri", mongodb_uri) \
.config("spark.mongodb.output.uri", mongodb_uri) \
.getOrCreate()
# .config("spark.starrocks.url", starrok_uri) \
# .config("spark.starrocks.driver", "com.starroks.jdbc.Driver") \
# .config("spark.sql.catalogImplementation", "in-memory") \
# .getOrCreate()
session._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
except Exception as e:
logger.error(f"Error creando sesion. {e}")
finally:
return session
def get_goal_by_kpi(df: DataFrame, agent: str, period: str, kpi: str) -> float:
result = 0.0
try:
df = df.filter((df["CEDULA"] == agent) & (df["PERIODO_PROCESO_CODIGO"] == period) & (df["KPI"] == kpi)). \
select("META_FINAL")
if df.count() != 0:
results = [row[0] for row in df.select("META_FINAL").collect()]
result = results[0]
except Exception as e:
logger.error(f"Error obteniendo meta por kpi. {e}")
finally:
return result
def get_execute_by_service(df: DataFrame, agent: str, period: str, segment: str) -> int:
result = 0
try:
df = df.filter((df["AGENTE_COMISIONA"] == agent) & (df["PERIODO_PROCESO_CODIGO"] == period) &
(df["SEGMENTO"] == segment))
result = df.count()
except Exception as e:
logger.error(f"Error obteniendo meta por segmento. {e}")
finally:
return result
import time
import json
import logging
from typing import Any, Dict
from prefect import flow
from prefect import flow, get_run_logger
from Pipeline.CommissionProcess import CommissionProcess
logger = logging.getLogger()
SPARK_JARS = {
"MONGO_CORE": "/opt/spark-jars/mongodb-driver-core-4.0.4.jar",
"MONGO_CLIENT": "/opt/spark-jars/mongodb-driver-sync-4.0.4.jar",
"MONGODB": "/opt/spark-jars/mongo-spark-connector_2.12-3.0.1.jar",
"BSON": "/opt/spark-jars/bson-4.0.4.jar"
}
MONGODB_URI = "mongodb://bcom_spark_user:root@192.168.1.37:50001/bcom_spark"
@flow()
def run_commission(config: Dict[str, Any]) -> None:
logger = get_run_logger()
start_time = time.time()
logger.info(f"Duración de ejecución del proceso de liquidación: {start_time - time.time()}")
commission_process = CommissionProcess(config)
# Conexion a Spark (LocalMode, StandAlone or Clúster)
start_init = time.time()
commission_process.init(SPARK_JARS, MONGODB_URI)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - Extraer la data - RECORDAR: SPARK ES LAZY!!!
start_reader = time.time()
commission_process.get_inputs(commission_process)
logger.info(f"Duración de extracción de datos desde la BD: {time.time() - start_reader}")
# Tercer task - Obtener metas
start_process = time.time()
goals = commission_process.get_goals_second_way(commission_process, "VENTAS", "GOALS")
# Quinto task - Obtener ejecutados - ¿Aplicar tmb filtro de FLAG_COMISIONABLE y ACTIVE_USER_TRAFFIC?
executes = commission_process.get_executed_second_way(commission_process, "VENTAS", "TEAMS")
# Sexo task - Obtener monto origen
base = commission_process.get_source_value(commission_process, "VENTAS", "COMERCIAL_BASE")
result = commission_process.get_commission_per_agent(commission_process, goals, executes, base)
logger.info(f"Duración de procesamiento en memoria: {time.time() - start_process}")
# Task de escritura
start_load = time.time()
_ = commission_process.write_result(commission_process, result, "REPORT_SUMMARY")
logger.info(f"Duración de carga del reporte a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso de etl: {time.time() - start_time}")
if __name__ == "__main__":
......
{
"identifier": "BCOM-SPARK-TESTS",
"period": "202311",
"inputs": {
"type": "bucket",
"params": {
......@@ -19,6 +20,7 @@
"CONSULTOR_NK": "TEXT",
"CLIENTE_ID": "TEXT",
"CLIENTE_NOMBRE": "TEXT",
"CLIENTE_NATURALEZA": "TEXT",
"SERVICIO": "TEXT",
"REVENUE": "DECIMAL",
"PLAN_CODIGIO_NK": "TEXT",
......
import time
import json
import logging
from typing import Any, Dict
from prefect import flow
from prefect import flow, get_run_logger
from Pipeline.ETLProcess import ETLProcess
logger = logging.getLogger()
SPARK_JARS = {
"AWS_CORE": "/opt/spark-jars/hadoop-aws-3.3.4.jar",
"BUNDLE": "/opt/spark-jars/aws-java-sdk-bundle-1.12.431.jar",
......@@ -17,23 +14,36 @@ SPARK_JARS = {
"MONGO_CORE": "/opt/spark-jars/mongodb-driver-core-4.0.4.jar",
"MONGO_CLIENT": "/opt/spark-jars/mongodb-driver-sync-4.0.4.jar",
"MONGODB": "/opt/spark-jars/mongo-spark-connector_2.12-3.0.1.jar",
"BSON": "/opt/spark-jars/bson-4.0.4.jar"
"BSON": "/opt/spark-jars/bson-4.0.4.jar",
"STARROK": "/opt/spark-jars/starrocks-spark-connector-3.4_2.12-1.1.2.jar",
"MYSQL": "/opt/spark-jars/mysql-connector-java-8.0.30.jar"
}
MONGODB_URI = "mongodb://bcom_spark_user:root@192.168.1.37:50001/bcom_spark"
STARROK_URI = "jdbc:starroks://root:@ec2-3-237-32-62.compute-1.amazonaws.com:9030/bcom_spark"
@flow
def run_etl(config: Dict[str, Any]) -> None:
logger = get_run_logger()
start_time = time.time()
etl_process = ETLProcess(config)
# Conexion a Spark (LocalMode, StandAlone or Clúster)
etl_process.init(SPARK_JARS, MONGODB_URI)
start_init = time.time()
etl_process.init(SPARK_JARS, starrok_uri=STARROK_URI)
logger.info(f"Duración de creación de sesión Spark: {time.time() - start_init}")
# Primer task - (Reader) - Extraer los ficheros
start_reader = time.time()
etl_process.reader(etl_process)
logger.info(f"Duración de extracción de ficheros desde S3: {time.time() - start_reader}")
# Segundo task - Setear esquema a las tablas
start_transform = time.time()
etl_process.set_schema(etl_process)
# Process - Insumo Gross (Ventas)
......@@ -41,8 +51,10 @@ def run_etl(config: Dict[str, Any]) -> None:
# Process - Insumo Team (Equipos)
teams_flag = etl_process.process_teams.submit(etl_process, "TEAMS")
logger.info(f"Duración de transformación y limpieza de datos: {time.time() - start_transform}")
# Write - Insumo GROSS
start_load = time.time()
etl_process.write.submit(etl_process, "VENTAS", ventas_flag)
# Write - Insumo TEAMS
etl_process.write.submit(etl_process, "TEAMS", teams_flag)
......@@ -50,8 +62,9 @@ def run_etl(config: Dict[str, Any]) -> None:
etl_process.write.submit(etl_process, "GOALS")
# Write - Insumo PLANTA
etl_process.write.submit(etl_process, "COMERCIAL_BASE")
logger.info(f"Duración de carga de datos a la BD: {time.time() - start_load}")
logger.info(f"Duración de ejecución del proceso ETL: {start_time - time.time()}")
logger.info(f"Duración de ejecución del proceso ETL General: {time.time() - start_time}")
if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment