Commit 15b44e0a authored by Cristian Aguirre's avatar Cristian Aguirre

Add ETL_flow

parent 08b96c74
from enum import Enum
class CloudProviderEnum(Enum):
AWS = "aws"
GOOGLE = "gcp"
from enum import Enum
from pyspark.sql.types import StringType, IntegerType, DecimalType, DateType
class DataTypeEnum(Enum):
INTEGER = IntegerType
TEXT = StringType
DECIMAL = DecimalType
DATE = DateType
from enum import Enum
class FileTypeEnum(Enum):
CSV = "csv"
EXCEL = "excel"
TXT = "txt"
PARQUET = "parquet"
from typing import Dict, Any
import logging
from pyspark.sql import SparkSession
from Enum.FileTypeEnum import FileTypeEnum
logger = logging.getLogger()
class BucketAwsInput:
def __init__(self, name: str, session: SparkSession, params: Dict[str, Any]) -> None:
self.name = name
self.session = session
self.input_path = params["path"]
self.input_type = params["type"]
self.separator = params["separator"]
self.schema = params["schema"]
self.data = None
def get_data(self) -> None:
try:
file_type = FileTypeEnum(self.input_type)
if not self.input_path.startswith("s3://") and not self.input_path.startswith("s3a://"):
raise Exception(f"Error getting descriptor from S3. Path should start with s3://")
final_path = self.input_path
if file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TXT:
self.data = self.session.read.csv(final_path, header=True, sep=self.separator, inferSchema=True)
elif file_type == FileTypeEnum.PARQUET:
self.data = self.session.read.parquet(final_path, header=True)
else:
logger.info(f"Formato de archivo no soportado: {self.input_type}")
except Exception as e:
logger.error(f"Error obteniendo data de insumo. método 'get_data'. {e}")
class Input:
def __init__(self, input_type: str) -> None:
self.input_type = input_type
from typing import Any, Dict
from pyspark.sql import SparkSession
from Enum.InputTypeEnum import InputTypeEnum
from Enum.CloudProviderEnum import CloudProviderEnum
from Input.BucketAwsSource import BucketAwsInput
class Input:
def __init__(self, input_type: str, session: SparkSession, params: Dict[str, Any], provider=None) -> None:
self.input_type = input_type
if not provider:
provider = CloudProviderEnum.AWS.value
if input_type == InputTypeEnum.BUCKET.value and provider == CloudProviderEnum.AWS.value:
self.factory = BucketAwsInput(params["identifier"], session, params)
self.data = None
def get_data(self) -> None:
self.factory.get_data()
self.data = self.factory.data
from typing import Dict, Any
import logging
from pyspark.sql.functions import col, when, lit
from prefect import task
from Enum.DataTypeEnum import DataTypeEnum
from Utils.SparkUtils import createSession
from Input.Source import Input
logger = logging.getLogger()
class ETLProcess:
def __init__(self, config: Dict[str, Any]) -> None:
self.conf = config
self.identifier = self.conf["identifier"]
self.session = None
self.inputs = {}
def init(self, spark_jars: Dict[str, str], mongodb_uri: str = "") -> None:
self.session = createSession(self.identifier, spark_jars, mongodb_uri)
@task
def reader(self) -> None:
try:
inputs = self.conf["inputs"]
input_type = inputs["type"]
provider = inputs["params"]["provider"] if "provider" in inputs["params"].keys() else None
for input_obj in inputs["data"]:
identifier = input_obj["identifier"]
params = {"identifier": identifier, "path": input_obj["path"], "type": input_obj["input_type"],
"separator": input_obj["separator"], "schema": input_obj["schema"]}
current_input = Input(input_type, self.session, params, provider)
current_input.get_data()
self.inputs.update({identifier: current_input.data})
except Exception as e:
raise AssertionError(f"Error in function extrayendo data. Reader. {e}")
@task
def set_schema(self) -> None:
try:
inputs = self.conf["inputs"]
for input_obj in inputs["data"]:
identifier = input_obj["identifier"]
schema = input_obj["schema"]
input_schema = self.create_schema(schema)
columns, schema = input_schema["columns"], input_schema["schema"]
self.inputs[identifier] = self.inputs[identifier].select(*columns)
for column, datatype in schema:
self.inputs[identifier] = self.inputs[identifier].withColumn(column, col(column).cast(datatype))
except Exception as e:
raise AssertionError(f"Error procesando información. Process. {e}")
def create_schema(self, schema: Dict[str, str]) -> Dict[str, Any]:
response = {}
try:
columns = list(schema.keys())
structure = []
for column in columns:
field = (column, DataTypeEnum[schema[column]].value())
structure.append(field)
response.update({"columns": columns, "schema": structure})
except Exception as e:
logger.error(f"Error leyendo esquema para el insumo {self.identifier}. {e}")
finally:
return response
@task
def process_gross(self, identifier: str) -> bool:
success = False
try:
self.inputs[identifier] = self.inputs[identifier].withColumn("AGENTE_COMISIONA", col("CONSULTOR_NK"))
self.inputs[identifier] = self.inputs[identifier].withColumn('SEGMENTO',
when((col('PLAN_NOMBRE').contains('Inter')) & (col('CLIENTE_NATURALEZA') == 'Persona Juridica'), 'B2B')
.when(col('PLAN_NOMBRE').contains('Neg'), 'B2B')
.when(col('SERVICIO').contains('Prepaid'), 'PREP').otherwise('B2C'))
self.inputs[identifier] = self.inputs[identifier].withColumn("TIPO_CANAL", lit("DIRECT"))
success = True
except Exception as e:
raise AssertionError(f"Error transformando archivo gross. {e}")
finally:
return success
@task
def process_teams(self, identifier: str) -> bool:
success = False
try:
self.inputs[identifier] = self.inputs[identifier].withColumn('SEGMENTO',
when(col('CLIENTE_NATURALEZA') == 'Persona Juridica', 'B2B')
.otherwise('B2C'))
self.inputs[identifier] = self.inputs[identifier].withColumn("TIPO_CANAL", lit("DIRECT"))
success = True
except Exception as e:
raise AssertionError(f"Error transformando archivo equipo. {e}")
finally:
return success
@task
def write(self, identifier: str, prev_status: bool = True) -> None:
try:
self.inputs[identifier].printSchema()
self.inputs[identifier].write.format("com.mongodb.spark.sql.DefaultSource"). \
option("collection", identifier).mode("append").save()
except Exception as e:
logger.error(f"Erro guardando resultados. {e}")
import logging
from typing import Dict, Any
from pyspark.sql import SparkSession, DataFrame
from prefect import flow, task
from Input.Source import Input
logger = logging.getLogger()
class Process:
def __init__(self, config: Dict[str, Any]) -> None:
self.conf = config
self.identifier = self.conf["identifier"]
self.session = None
self.inputs = {}
def init(self) -> None:
self._createSession()
def get_inputs(self) -> None:
try:
pass
except Exception as e:
raise AssertionError(f"Error in function 'get_inputs'. {e}")
def run(self) -> None:
# Get inputs
self.get_inputs()
from typing import Dict
from pyspark.sql import SparkSession
import logging
logger = logging.getLogger()
def createSession(name: str, spark_jars: Dict[str, str], mongodb_uri: str = "") -> SparkSession:
session = None
try:
jars = list(spark_jars.values())
jars = ",".join(jars)
print(jars)
session = SparkSession.builder \
.appName(name) \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
.config("spark.jars", jars) \
.config("spark.executor.extraClassPath", jars) \
.config("spark.driver.extraClassPath", jars) \
.config("spark.mongodb.output.uri", mongodb_uri) \
.getOrCreate()
session._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
except Exception as e:
logger.error(f"Error creando sesion. {e}")
finally:
return session
import time
import json
import logging
from typing import Any, Dict
from prefect import flow
logger = logging.getLogger()
@flow()
def run_commission(config: Dict[str, Any]) -> None:
start_time = time.time()
logger.info(f"Duración de ejecución del proceso de liquidación: {start_time - time.time()}")
if __name__ == "__main__":
conf_path = "config.json"
with open(conf_path) as f:
conf = json.load(f)
# Run Commission
run_commission(conf)
app:
input:
type: bucket
provider: aws
output:
type: bucket
provider: aws
{
"identifier": "BCOM-SPARK-TESTS",
"inputs": {
"type": "bucket",
"params": {
"provider": "aws"
},
"data": [
{
"identifier": "VENTAS",
"path": "s3a://prueba-id/bcom-tests/inputs/gross_202311.txt",
"input_type": "txt",
"separator": "|",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"SUBSCRIPTOR_ID": "TEXT",
"MOVIMIENTO_TIPO": "TEXT",
"FLAG_COMISIONABLE": "TEXT",
"CONSULTOR_NK": "TEXT",
"CLIENTE_ID": "TEXT",
"CLIENTE_NOMBRE": "TEXT",
"SERVICIO": "TEXT",
"REVENUE": "DECIMAL",
"PLAN_CODIGIO_NK": "TEXT",
"PLAN_NOMBRE": "TEXT",
"ACTIVE_USER_TRAFFIC": "TEXT"
}
},
{
"identifier": "TEAMS",
"path": "s3a://prueba-id/bcom-tests/inputs/equipos_202311.txt",
"input_type": "txt",
"separator": "|",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"MODELO_TIPO": "TEXT",
"SUBSCRIBER_ID": "TEXT",
"CONSULTOR_DOCUMENTO": "TEXT",
"CLIENTE_CODIGO_NK": "TEXT",
"SERVICIO": "TEXT",
"PRECIO_VENTA": "DECIMAL"
}
},
{
"identifier": "GOALS",
"path": "s3a://prueba-id/bcom-tests/inputs/metas_202311.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"CEDULA": "TEXT",
"KPI": "TEXT",
"META_INCIAL": "DECIMAL",
"META_FINAL": "DECIMAL"
}
},
{
"identifier": "COMERCIAL_BASE",
"path": "s3a://prueba-id/bcom-tests/inputs/planta_comercial_202311.csv",
"input_type": "csv",
"separator": ";",
"schema": {
"PERIODO_PROCESO_CODIGO": "TEXT",
"CEDULA": "TEXT",
"ESTADO": "TEXT",
"VARIABLE_COMISION": "DECIMAL"
}
}
]
},
"output": {
"type": "bucket",
"params": {
"provider": "aws",
"bucket": "prueba-id"
}
}
}
\ No newline at end of file
import time
import json
import logging
from typing import Any, Dict
from prefect import flow
from Pipeline.ETLProcess import ETLProcess
logger = logging.getLogger()
SPARK_JARS = {
"AWS_CORE": "/opt/spark-jars/hadoop-aws-3.3.4.jar",
"BUNDLE": "/opt/spark-jars/aws-java-sdk-bundle-1.12.431.jar",
"COMMON": "/opt/spark-jars/hadoop-common-3.3.4.jar",
"AWS_CLIENT": "/opt/spark-jars/hadoop-client-3.3.4.jar",
"MONGO_CORE": "/opt/spark-jars/mongodb-driver-core-4.0.4.jar",
"MONGO_CLIENT": "/opt/spark-jars/mongodb-driver-sync-4.0.4.jar",
"MONGODB": "/opt/spark-jars/mongo-spark-connector_2.12-3.0.1.jar",
"BSON": "/opt/spark-jars/bson-4.0.4.jar"
}
MONGODB_URI = "mongodb://bcom_spark_user:root@192.168.1.37:50001/bcom_spark"
@flow
def run_etl(config: Dict[str, Any]) -> None:
start_time = time.time()
etl_process = ETLProcess(config)
# Conexion a Spark (LocalMode, StandAlone or Clúster)
etl_process.init(SPARK_JARS, MONGODB_URI)
# Primer task - (Reader) - Extraer los ficheros
etl_process.reader(etl_process)
# Segundo task - Setear esquema a las tablas
etl_process.set_schema(etl_process)
# Process - Insumo Gross (Ventas)
ventas_flag = etl_process.process_gross.submit(etl_process, "VENTAS")
# Process - Insumo Team (Equipos)
teams_flag = etl_process.process_teams.submit(etl_process, "TEAMS")
# Write - Insumo GROSS
etl_process.write.submit(etl_process, "VENTAS", ventas_flag)
# Write - Insumo TEAMS
etl_process.write.submit(etl_process, "TEAMS", teams_flag)
# Write - Insumo GOALS
etl_process.write.submit(etl_process, "GOALS")
# Write - Insumo PLANTA
etl_process.write.submit(etl_process, "COMERCIAL_BASE")
logger.info(f"Duración de ejecución del proceso ETL: {start_time - time.time()}")
if __name__ == "__main__":
conf_path = "config.json"
with open(conf_path) as f:
conf = json.load(f)
# Run ETL
run_etl(conf)
import time
import logging
logger = logging.getLogger()
if __name__ == "__main__":
start_time = time.time()
logger.info(f"Duración de ejecución del proceso: {start_time - time.time()}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment