Commit fddc6a2b authored by Cristian Aguirre's avatar Cristian Aguirre

Update SparkUtils.py, ETLProcess.py

parent 805f468e
......@@ -252,6 +252,21 @@ class CommissionProcess:
(gross_df["FLAG_COMISIONABLE"] == "1") & (gross_df["ACTIVE_USER_TRAFFIC"] == "1")]
gross_b2c = gross_b2c.groupBy("AGENTE_COMISIONA").agg(count("SERVICIO").alias("EJECUTADO_3"))
# CONVERTIDO A SPARK SQL:
# gross_df.createOrReplaceTempView("gross_df")
#
# # Escribir la consulta SQL
# query = """
# SELECT AGENTE_COMISIONA, COUNT(SERVICIO) AS EJECUTADO_3
# FROM gross_df
# WHERE SEGMENTO = 'B2C' AND SERVICIO = 'Postpaid' AND FLAG_COMISIONABLE = '1' AND ACTIVE_USER_TRAFFIC = '1'
# GROUP BY AGENTE_COMISIONA
# """
#
# # Ejecutar la consulta SQL
# gross_b2c = spark.sql(query)
condition = [structure_df["PIOS_INDID"] == gross_b2c["AGENTE_COMISIONA"]]
result_3 = structure_df.select("PIOS_INDID").join(gross_b2c, condition, how='left')
result_3 = result_3.na.fill(value=0, subset=["EJECUTADO_3"])
......@@ -333,6 +348,23 @@ class CommissionProcess:
result = structure_df.select("PIOS_INDID").join(base_df, condition, how='left')
result = result.na.fill(value=0, subset=["VARIABLE_COMISION"])
result = result.drop(*["CEDULA"])
# Convertido a SQL
# base_df.createOrReplaceTempView("base_df")
# structure_df.createOrReplaceTempView("structure_df")
#
# query = """
# SELECT DISTINCT b.CEDULA, b.VARIABLE_COMISION
# FROM base_df b
# JOIN structure_df s ON s.PIOS_INDID = b.CEDULA
# WHERE b.PERIODO_PROCESO_CODIGO = '{}'
# """.format(self.period)
#
# result = spark.sql(query)
# result = result.na.fill(value=0, subset=["VARIABLE_COMISION"])
# result = result.drop("CEDULA")
result = result.withColumnRenamed("PIOS_INDID", "AGENTE_COMISIONA")
except Exception as e:
logger.error(f"Error obteniendo valor de monto de origen. {e}")
......@@ -422,13 +454,15 @@ class CommissionProcess:
.mode("append") \
.save()
elif db_type == DatabaseTypeEnum.MYSQL:
mysql_user = self.conf["mysql"]["user"]
mysql_pass = self.conf["mysql"]["password"]
df.write \
.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", mysql_url) \
.option("dbtable", table_name) \
.option("user", "root") \
.option("password", "root") \
.option("user", mysql_user) \
.option("password", mysql_pass) \
.mode("overwrite") \
.save()
else:
......
......@@ -139,13 +139,15 @@ class ETLProcess:
.mode("append") \
.save()
elif db_type == DatabaseTypeEnum.MYSQL:
mysql_user = self.conf["mysql"]["user"]
mysql_pass = self.conf["mysql"]["password"]
self.inputs[identifier].write \
.format("jdbc") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("url", mysql_url) \
.option("dbtable", identifier) \
.option("user", "root") \
.option("password", "root") \
.option("user", mysql_user) \
.option("password", mysql_pass) \
.mode("append") \
.save()
else:
......
......@@ -18,12 +18,9 @@ def createSession(name: str, spark_jars: Dict[str, str], source_type: InputTypeE
.appName(name) \
.config("spark.jars", jars) \
.config("spark.jars.packages", "graphframes:graphframes:0.8.3-spark3.4-s_2.12") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
.config("spark.executor.extraClassPath", jars) \
.config("spark.driver.extraClassPath", jars) \
.config("spark.starrocks.driver", "com.starroks.jdbc.Driver") \
.config("spark.sql.catalogImplementation", "in-memory") \
.getOrCreate()
session._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
if source_type == InputTypeEnum.LOCAL:
......@@ -32,6 +29,8 @@ def createSession(name: str, spark_jars: Dict[str, str], source_type: InputTypeE
session._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
session._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minioadmin")
session._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minioadmin")
else:
session.conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
except Exception as e:
logger.error(f"Error creando sesion. {e}")
finally:
......
......@@ -19,7 +19,7 @@ REDSHIFT_JDBC = "jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redsh
MYSQL_JDBC = "jdbc:mysql://localhost:13306/bcom_spark"
DB_TYPE = DatabaseTypeEnum.MYSQL
DB_TYPE = DatabaseTypeEnum.STARROKS
@flow()
......
......@@ -6,22 +6,6 @@
"provider": "aws"
},
"data": [
{
"identifier": "FACTURACION",
"path": "s3a://prueba-id2/bcom-tests/inputs/Facturacion_20240320.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"NOMBRE_CLIENTE": "TEXT",
"NUM_FACTURA": "TEXT",
"DOCUMENTO": "TEXT",
"CIUDAD": "TEXT",
"FECHA_VENCIMIENTO": "TEXT",
"SUBS_ID": "TEXT",
"ESTADO_FACTURA": "TEXT"
}
},
{
"identifier": "ENDING",
"path": "s3a://prueba-id2/bcom-tests/inputs/Ending_20240320.csv",
......@@ -48,7 +32,7 @@
"password": "Awsuser123"
},
"mysql": {
"user": "admin",
"password": "awsuser123"
"user": "root",
"password": "root"
}
}
\ No newline at end of file
......@@ -2,9 +2,10 @@ import time
import json
from typing import Any, Dict
from prefect import flow, get_run_logger
from Enum.DatabaseTypeEnum import DatabaseTypeEnum
from Enum.InputTypeEnum import InputTypeEnum
from Pipeline.ETLProcess import ETLProcess
......
......@@ -18,14 +18,15 @@ SPARK_JARS = {
"REDSHIFT": "/opt/spark-jars/redshift-jdbc42-2.1.0.12.jar"
}
STARROK_JDBC = "jdbc:mysql://192.168.1.37:9030/bcom_spark"
STARROK_JDBC = "jdbc:starrocks://192.168.1.37:9030/bcom_spark"
STARROK_FE_NODE = "192.168.1.37:8030"
REDSHIFT_JDBC = "jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=prueba_ca"
REDSHIFT_JDBC = ("jdbc:redshift://redshift-cluster-1.cumpswji5bs3.us-east-1.redshift.amazonaws.com:5439/dev"
"?currentSchema=prueba_ca")
MYSQL_JDBC = "jdbc:mysql://localhost:13306/bcom_spark"
DB_TYPE = DatabaseTypeEnum.MYSQL
DB_TYPE = DatabaseTypeEnum.STARROKS
SOURCE_TYPE = InputTypeEnum.BUCKET
......@@ -50,7 +51,6 @@ def run_etl(config: Dict[str, Any]) -> None:
# Segundo task - Setear esquema a las tablas
start_transform = time.time()
etl_process.set_schema(etl_process)
# Process - Insumo Facturacion
teams_fact = etl_process.process_facturacion(etl_process, "FACTURACION")
......
......@@ -7,6 +7,7 @@ asgi-lifespan==2.1.0
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
bottle==0.12.25
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
......@@ -24,6 +25,7 @@ email_validator==2.1.1
exceptiongroup==1.2.0
fsspec==2024.3.1
google-auth==2.28.2
graphframes==0.6
graphviz==0.20.2
greenlet==3.0.3
griffe==0.42.0
......@@ -47,6 +49,8 @@ Markdown==3.6
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
nose==1.3.7
numpy==1.26.4
oauthlib==3.2.2
orjson==3.9.15
packaging==24.0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment