Commit bbfda096 authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer-ca' into 'developer'

Process-BCOM DAG 1

See merge request !2
parents 0e214512 55785c2d
# bcom-tp-etl-transformation-pipelines # Proyecto BCOM TRANSFORMACIONES
## Guía de despliegue y uso de los DAG's con Airflow y Kubernetes
### Requisitos previos
- Tener instalado Dockerfile, en su versión más reciente
- Tener un repositorio público en DockerHub o un repositorio local de imágenes
- Tener instalado Kubernetes (k8) y kubectl
- (Opcional) Para ambiente de desarrollo, se recomienda instalar "Minikube" en vez de k8, la cual
simula un clúster de k8 dentro de un contenedor.
- Tener instalado el Minio Server en local o utilizar un AWS S3
## Despliegue de ambiente:
⚠️ *Los 2 siguientes pasos solo se deben realizar la primera vez. Y solo será
necesario repetirlo si el archivo Dockerfile_worker cambia y esto sucede si se necesitan
más librerías externa para ejecutar los DAGs*
1.- Situarse en la carpeta **deploy-k8** y luego crear la imagen customizada de Airflow
más las dependencias necesarias para ejecutar nuestro DAG. Por ejemplo, ejecutar el siguiente comando:
```shell
docker build -t airflow_custom:1.0.0 -f Dockerfile_worker .
```
2.- Ahora subimos nuestra imagen al DockerHub o repositorio local, para esto primero
nos debemos loguear, crear el tag y realizar el push. Por ejemplo:
```shell
docker login
docker tag airflow_custom:1.0.0 <nombrecuenta>/<repositorio>:<tag>
docker push <nombrecuenta>/<repositorio>:<tag>
```
3.- Con la imagen ya subida a un repositorio, colocamos este nombre en el archivo
**airflow-envvars-configmap.yaml**, además que aprovechamos en actualizar y validar
todas las configuraciones presentes y también en nuestro
archivo de Secrets **airflow-secrets.yaml**
```text
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY=<cuenta/SERVER-URL>/<repositorio>
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG="<tag>"
```
NOTA: *Detalle de cada variable de entorno usada por los POD y Airflow:*
```text
AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE: bcom-airflow # El namespace en k8s donde los workers serán creados
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: '30' # Intervalo de tiempo en ir a buscar nuevos dags en la carpeta de dags
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO # Nivel de log de Airflow (webserver, scheduler y workers)
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: America/Lima # Timezone de la Web de Airflow
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima # Timezone del Scheduler de Airflow
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}' # Tiempo de espera de respuesta de Kubernetes API
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom # Ruta de imagen a usar para crear el worker POD
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.1" # Tag a usar para crear el worker POD
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc # Nombre del volumen usado para almacenar los logs
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF: airflow-envvars-configmap # Nombre del configMap donde estan defenidos las variables de entorno
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE: /opt/airflow/templates/pod_template.yaml # Ruta del host donde esta el template para iniciar el worker
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor # Tipo de Executor a usar por Airflow
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow # Ruta de conexión por default de Airflow con Postgres
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' # Pausar (que no esten iniciados) los DAG's cuando se creen
AIRFLOW__CORE__LOAD_EXAMPLES: 'true' # Colocar los DAG's de ejemplo en la web
_AIRFLOW_DB_UPGRADE: 'true' # Actualizar la BD si es necesario
_AIRFLOW_WWW_USER_CREATE: 'true' # Crear usuario inicial para usar la web
_AIRFLOW_WWW_USER_USERNAME: admin # Username del usuario de la web
_AIRFLOW_WWW_USER_PASSWORD: admin # Contraseña del usuario de la web
S3_DAGS_DIR: 's3://prueba1234568/dags' # Ruta del S3 (si en caso se usa S3) donde estan alojados los DAG's a sincronizar
SYNCHRONYZE_DAG_DIR: '30' # Tiempo (en segundos) para ir a buscar actualizaciones en la carpeta de dags
MINIO_SERVER: 'http://192.168.49.2:9000' # Ruta del Minio Server (Si en caso se usa Minio)
MINIO_DAGS_DIR: '/prueba-ca/dags' # Ubicación de los DAG's dentro del bucket de Minio
```
Adicionalmente, validar o actualizar los valores del archivo **airflow-secrets.yaml** (recordar que
tiene encriptación con método **"base64"** - tipo de Secret **Opaque**)
```text
AWS_ACCESS_KEY: bWluaW9hZG1pbg== # ACCESS_KEY de AWS si en caso se usa bucket S3 para guardar los DAG's
AWS_SECRET_KEY: bWluaW9hZG1pbg== # SECRET_KEY de AWS si en caso se usa bucket S3 para guardar los DAG's
MINIO_USER: bWluaW9hZG1pbg== # Usuario de Minio para conectarse al bucket del Minio Server
MINIO_PASSWORD: bWluaW9hZG1pbg== # Contraseña de Minio para conectarse al bucket del Minio Server
```
4.- **Como nuestro despliegue hace uso de NFS como sistema de persistencia de datos, tanto para la BD de Postgres,
logs de Airflow y dags desarrollados**, entonces se tiene que configurar los servidores NFS usados.
Para eso modificamos y/o validamos el archivo **airflow-volumes.yaml** (sección de _PersistentVolume_) para corrobar los hosts y paths de los
volúmenes. Por ejemplo, parte del template donde está el volúmen para los dags desarrollados es el siguiente:
```yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags-pv
namespace: bcom-airflow
spec:
capacity:
storage: 300Mi # Espacio solicitado para el storage de los dags
accessModes:
- ReadWriteMany
storageClassName: airflow-dags
nfs:
server: 192.168.1.11 # Host del servidor NFS a usar
path: "/mnt/nfs_share" # Path dentro del servidor NFS dedicado a almacenar los DAG's desarrollados
```
Luego de configurar la ruta del host y path del servidor NFS a usar para cada requisito, se modifica y/o valida
cuánto storage del volumen se va a usar para tal propósito. Por ejemplo para los dags desarrollados, vamos a
indicar que de los 300Mi, vamos a utilizar 200Mi. Esto también dentro del archivo airflow-volumes.yaml, pero en la
sección de PersistentVolumeClaim (este uso de volumen se enlaza con el volumen creado antes por medio del
_storageClassName_):
```yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags-pvc
namespace: bcom-airflow
spec:
accessModes:
- ReadWriteMany
storageClassName: airflow-dags
resources:
requests:
storage: 200Mi # Se solicita un uso de 200Mi para el volumen de tipo airflow-dags
```
5.- Ejecutar el script **script_apply.sh** para crear todos los configsMaps, volúmenes, secrets, deployments
y servicios para correr Airflow.
⚠️ _Nota_: Verifique si usará la sincronización de la carpeta **dags** con un bucket en Minio o en S3.
De acuerdo a esto tendrá que ejecutar el archivo _sync-dags-deployment.yaml_ o _sync-dags-deployment-s3.yaml_ y
colocarlo en los archivos **script-apply.sh** y **script-delete.sh**.
```shell
sh script_apply.sh
```
Con este comando, tiene que esperar unos minutos para que todos los recursos esten levantados y corriendo
normalmente. Todos los recursos serán creados en el namespace **bcom-airflow** (configurado en los
templates)
6.- (OPCIONAL) Si en caso a deployado usando Minikube, será necesario exponer el puerto del Airflow Webserver
para que desde su local puede ingresar a la web. La forma más simple es exponiendo directamente el puerto del
servicio del clúster a nuestro localhost con la ayuda de _kubectl_. Ejecutamos el siguiente comando:
```shell
kubectl port-forward <ID-POD-AIRFLOW-WEBSERVER> 8081:8080 -n bcom-airflow
```
Ahora desde su navegador puede ingresar la ruta http://localhost:8081 para ver la Web de Airflow
7.- Validamos que nuestros POD's están corriendo en estado "Running" con el siguiente comando:
```shell
kubectl get pods -n bcom-airflow
```
## Agregar o actualizar DAG's en Airflow
Para actualizar el DAG ya existente de **TACOMVENTAS Y PROMOCIONES RESIDENCIAL** o agregar un nuevo
DAG en este despliegue de Airflow, se siguen los siguientes pasos:
1.- Verificar que nuestro ambiente en Kubernetes está levantado correctamente. Ejecutamos el comando:
```shell
kubectl get pods -n bcom-airflow
```
2.- Los archivos actualizados del dag colocarlos en el bucket y prefijo correspondiente de **dags**.
Se deben colocar en la ruta configurada en la variable de entorno del archivo **airflow-envvars-configmap.yaml**.
Si usamos S3, sería en la variable _S3_DAGS_DIR_, si fuera Minio se usará _MINIO_DAGS_DIR_.
La variable muestra la url completa <bucket>/<prefijo>. Por ejemplo:
```text
S3_DAGS_DIR: 's3://prueba1234568/dags'
MINIO_DAGS_DIR: '/prueba-ca/dags'
```
3.- Solo quedaría esperar n segundos, de acuerdo a las variables _SYNCHRONYZE_DAG_DIR_ (el POD sync
sincronizará el bucket con la carpeta dags cada n segundos) y _AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL_
(el scheduler Airflow irá a buscar actualizaciones en su carpeta dags cada n segundos). Luego del tiempo
establecido se podrá ver los cambios en la web de Airflow.
## Ejecutar DAG TACOMVENTAS y PROMOCIONES_RESIDENCIAL
_Requisitos para ejecutar el DAG:_
- El ambiente este desplegado correctamente y los POD's en estado "Running"
- Crear la conexión en la web de Airflow (usuario admin tiene estos permisos) con el nombre que está configurado en app_conf.yml con
parámetro **s3_conn_id**.
- Validar todas las rutas (bucket y prefijos) y configuraciones de los 7 insumos en app_conf.yml.
1.- En la web de Airflow, ubicamos nuestro DAG con nombre: **"BCOM_DAG_TRANSFORMACION_TACOMVENTAS_PROMOCIONESRESIDENCIAL"**
(configurado como constante en dag_transformacion_tacomventas_promoresidencial.py) y lo inicializamos
por medio de su checkbox (usuario admin puede realizarlo).
2.- De acuerdo a la configuración actual, el DAG se ejecutará cada 2 horas (configurado en app_conf.yml con parámetro
**schedule**).
## Desplegar Minio Server (Single Node) - Para ambiente de desarrollo
Vamos a instalar Minio Server y tener la funcionalidad de crear buckets, prefijos y subir archivos en
ellos. Minio reemplazaría AWS S3 dado que ya nos va a proporcionar los buckets.
1.- Descargar el instalador y ejecutarlo con los siguientes comandos (Para Debian/Ubuntu):
```shell
wget https://dl.min.io/server/minio/release/linux-amd64/archive/minio_20230602231726.0.0_amd64.deb -O minio.deb
sudo dpkg -i minio.deb
```
2.- Vamos a crear una carpeta para que se alojen ahí todos los buckets que en un futuro se creen. Ejecutamos:
```shell
mkdir ~/minio/
```
3.- Ejecutamos el servicio de Minio e indicamos la carpeta creada y el puerto para acceder a su web (o
también llamada consola):
```shell
minio server ~/minio --console-address :9090
```
_NOTA:_ Por defecto minio setea sus credenciales de root con los siguientes valores:
```text
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
```
4.- Luego accedemos a la web de Minio por medio del puerto 9090 e ingresamos las credenciales por defecto
y listo, tenemos Minio para poder crear buckets y serviría como reemplazo de AWS S3.
Referencia: [Instalar Minio Server y Opcionalmente Minio Client](https://www.youtube.com/watch?v=EsJHf5nUYyA&ab_channel=MinIO)
...@@ -2,13 +2,13 @@ ...@@ -2,13 +2,13 @@
general: general:
s3_parameters: s3_parameters:
s3_conn_id: "bcom_tp_connection" s3_conn_id: "bcom_tp_connection"
bucket: "prueba1234568" bucket: "prueba-ca"
dags: dags:
dag1: dag1:
schedule: "@once" schedule: "@once"
period_pattern: '[a-zA-Z0-9]+([0-9]{4})(\-[0-9]{2})?\.[a-zA-Z]*' group_input_interval_days: '7'
csv_delimiter: "," csv_delimiter: ";"
filters: filters:
fields_omited: ["BCOM_ROW_IDENTIFIER", "BCOM_PROCESS_ID", "BCOM_FIELD_KEY", "BCOM_LNUMBER", "BCOM_ERROR_CODE", fields_omited: ["BCOM_ROW_IDENTIFIER", "BCOM_PROCESS_ID", "BCOM_FIELD_KEY", "BCOM_LNUMBER", "BCOM_ERROR_CODE",
"BCOM_ERROR_MESSAGE"] "BCOM_ERROR_MESSAGE"]
...@@ -22,8 +22,8 @@ dags: ...@@ -22,8 +22,8 @@ dags:
s3_parameters: s3_parameters:
inputs: inputs:
prefix: "pruebas_qa" prefix: "pruebas_qa"
tacom_pattern: "tacomventas_original*.txt" tacom_pattern: "TACOMVENTAS_original*.xlsx"
promociones_pattern: "promociones_original*.txt" promociones_pattern: "PROMOCIONES_RESIDENCIAL_original*.xlsx"
outputs: outputs:
prefix: "prueba3/tacom_outputs" prefix: "prueba3/tacom_outputs"
tacom_output: "tacom_modified.csv" tacom_output: "tacom_modified.csv"
...@@ -41,14 +41,14 @@ dags: ...@@ -41,14 +41,14 @@ dags:
relacion3pa2p: relacion3pa2p:
type: "INSUMO" type: "INSUMO"
pattern: "temporal_relacion3pa2p*.txt" pattern: "temporal_relacion3pa2p*.txt"
prefix: "pruebas_qa" prefix: ""
key_field: "TRESP" key_field: "TRESP"
value_field: "DOSP" value_field: "DOSP"
delimiter: "," delimiter: ","
relacionpoidpaquete: relacionpoidpaquete:
type: "INSUMO" type: "INSUMO"
pattern: "temporal_relacion_Paquete*.txt" pattern: "temporal_relacion_Paquete*.txt"
prefix: "pruebas_qa" prefix: ""
key_field: "POID_PRODUCT" key_field: "POID_PRODUCT"
value_field: "CD_PAQUETE" value_field: "CD_PAQUETE"
delimiter: "," delimiter: ","
......
from typing import Dict, Any
import json
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from components.Utils import select_multiple
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.S3Route import load_obj_to_s3
from enums.OperationTypeEnum import OperationTypeEnum
import logging
logger = logging.getLogger()
def validate_clean(control_params: Dict[str, Any], **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG")
conn = control_params["connection_id"]
bucket = control_params["bucket"]
prefix = control_params["prefix"]
if not prefix.endswith("/"):
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
def clean(command: str, intern_conn):
engine = intern_conn.engine
tablename = select_multiple(command)["tablename"]
logger.info(f"Borrando tabla {tablename}")
delete = delete_table(tablename, engine)
if delete:
logger.info(f"Borrado correctamente la tabla {tablename}")
logger.info(f"Borrado todas las variables xcom")
@task(task_id="MASTER_CLEANING", trigger_rule='none_skipped')
def get_cleaners_from_xcom(**kwargs):
final_selects = []
task = kwargs['ti']
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith(OperationTypeEnum.SELECT.value) and not key.startswith(OperationTypeEnum.PROCEDURE.value):
continue
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_outputs}")
for select in xcom_outputs:
final_selects.append(select)
logger.info(f"Final selects: {final_selects}")
Variable.set(key='CLEANS', value=final_selects, serialize_json=True)
return [[item] for item in final_selects]
def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any]) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="LimpiezaDelProceso", prefix_group_id=False) as group:
cleaners = get_cleaners_from_xcom()
tasks = PythonOperator.partial(
task_id="CLEANERS",
python_callable=clean,
op_kwargs={'intern_conn': db_intern_conn}
).expand(op_args=cleaners)
validate_task = PythonOperator(
task_id="VALIDATE_CLEANER",
python_callable=validate_clean,
op_kwargs={'control_params': control_s3},
trigger_rule='none_skipped'
)
cleaners >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de limpiadores. {e}")
finally:
return group
from components.Timezone import datetime_by_tzone
from enums.ProcessStatusEnum import ProcessStatusEnum
import logging
from typing import Dict, Any, List, Tuple
logger = logging.getLogger()
def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[str, Any]:
response = {'status': ProcessStatusEnum.SUCCESS.value, 'tasks': []}
try:
conf = conf[-1]
logger.info(f"Último proceso ejecutado: {conf}")
status = conf["status"]
if status == ProcessStatusEnum.FAIL.value:
response["status"] = ProcessStatusEnum.FAIL.value
success_tasks = conf["tasks"]
final_tasks = []
for key in success_tasks.keys():
if type_task.lower() not in key.lower():
continue
task = success_tasks[key]
task_status = task["status"]
if task_status == ProcessStatusEnum.SUCCESS.value:
command = task["description"]
result = (key, command)
final_tasks.append(result)
response["tasks"] = final_tasks
except Exception as e:
logger.error(f"Error obteniendo task fallidos desde control. {e}")
finally:
return response
def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str, Any],
timezone: str, delete_last: bool = False) -> List[Dict[str, Any]]:
try:
if delete_last:
print("CONF-TRANS:", conf)
last_tasks = conf[-1]["tasks"]
tasks.update(last_tasks)
print("LAST:", tasks)
conf.pop(-1)
current_datetime = str(datetime_by_tzone(timezone, '%Y-%m-%d %H:%M:%S'))
new_process = {"date": current_datetime, "status": status, "tasks": tasks}
conf.append(new_process)
except Exception as e:
logger.error(f"Error actualizando archivo de control. {e}")
finally:
return conf
import pandas as pd
import logging
logger = logging.getLogger()
def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool = False) -> int:
final_steps = 0
try:
if is_tablename:
count_command = f"SELECT COUNT(*) FROM {sql_command}"
else:
count_command = f"SELECT COUNT(*) FROM ({sql_command}) BCOM"
with connection.connect() as conn:
result = conn.execute(count_command).first()
if result:
total_rows = int(result[0])
logger.info(f"Total de filas: {total_rows}")
if total_rows == chunksize:
final_steps = 1
else:
final_steps = int(total_rows/chunksize) + 1
except Exception as e:
logger.error(f"Error calculando el total de N° de filas desde el comando: {sql_command}. {e}")
finally:
return final_steps
def get_iterator(command: str, chunksize: int, connection) -> iter:
iterator = None
try:
connection = connection.execution_options(stream_results=True)
iterator = pd.read_sql(command, connection, index_col=None, chunksize=chunksize)
iterator = iter(iterator)
except Exception as e:
logger.error(f"Error trayendo iterator. {e}")
finally:
return iterator
import pandas as pd
import logging
logger = logging.getLogger()
def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
save = True
try:
with connection.connect() as conn:
df.to_sql(tablename, conn, if_exists='append', index=False, chunksize=500)
except Exception as e:
logger.error(f"Error guardando resultados desde dataframe. {e}")
finally:
return save
import logging
import time
from typing import List
logger = logging.getLogger()
def execute_transformations(commands: List[str], engine):
try:
with engine.connect() as connection:
for command in commands:
logger.debug(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
except Exception as e:
logger.error(f"Error ejecutando comando de transformación. {e}")
def delete_table(tablename: str, engine) -> bool:
delete = False
try:
command = f"DROP TABLE {tablename}"
start_time = time.time()
with engine.connect() as conn:
try:
_ = conn.execute(command)
except Exception:
logger.error(f"Tabla no encontrada")
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
except Exception as e:
logger.error(f"Error borrando tabla {tablename}. {e}")
finally:
return delete
from typing import List, Tuple
from sqlalchemy import Table, Column, MetaData
from components.Databases.Mysql import Mysql
from components.Databases.Mariadb import Mariadb
from components.Databases.Postgres import Postgres
from components.Databases.Oracle import Oracle
from enums.DatabaseTypeEnum import DatabaseTypeEnum
import logging
import sys
import oracledb
oracledb.version = "8.3.0"
sys.modules["cx_Oracle"] = oracledb
logger = logging.getLogger()
metadata_obj = MetaData()
class Database:
def __init__(self, db_type: str, host: str, port: int, user: str, password: str, database: str,
service: str, schema: str) -> None:
self.db_type = db_type
if db_type == DatabaseTypeEnum.MYSQL.value:
self.factory = Mysql(host, port, user, password, database)
elif db_type == DatabaseTypeEnum.MARIADB.value:
self.factory = Mariadb(host, port, user, password, database)
elif db_type == DatabaseTypeEnum.POSTGRES.value:
self.factory = Postgres(host, port, user, password, database, schema)
elif db_type == DatabaseTypeEnum.ORACLE.value:
self.factory = Oracle(host, port, user, password, service)
self.engine = None
def get_basic_connection(self):
connection = None
try:
connection = self.factory.get_basic_connection()
except Exception as e:
logger.error(f"Error trayendo básica conexión. {e}")
finally:
return connection
def close_basic_connection(self) -> None:
try:
self.factory.connection.close()
self.factory.connection = None
except Exception as e:
logger.error(f"Error cerrando básica conexión. {e}")
def create_engine(self) -> None:
try:
if isinstance(self.engine, type(None)):
self.factory.create_engine()
self.engine = self.factory.engine
except Exception as e:
logger.error(f"Error creando db engine. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel") -> bool:
create = False
try:
create = self.factory.create_model(tablename, fields, modelName)
except Exception as e:
raise AssertionError(f"Error creando tabla dinámica con nombre {tablename}. {e}")
finally:
return create
def create_table(self, model) -> bool:
save = False
try:
model.metadata.create_all(self.engine)
save = True
except Exception as e:
logger.error(f"Error creando tabla {model}. {e}")
finally:
return save
from enum import Enum
class MysqlDataTypeEnum(Enum):
VARCHAR = 253
TINY_INT = 1
INT = 3
TEXT = 252
DECIMAL = 0
SHORT = 2
TIMESTAMP = 7
JSON = 245
BIGINT = 8
FLOAT = 4
DATETIME = 12
DATE = 10
TIME = 11
DOUBLE = 5
from enum import Enum
from sqlalchemy import INTEGER, String, Float, DATE, TIMESTAMP, NUMERIC, CHAR, TEXT, BLOB, JSON, BIGINT, \
DATETIME, TIME, DECIMAL
class MysqlDataTypeORMEnum(Enum):
VARCHAR = String
TINY_INT = INTEGER
INT = INTEGER
TEXT = TEXT
DECIMAL = DECIMAL
SHORT = String
TIMESTAMP = TIMESTAMP
JSON = JSON
BIGINT = BIGINT
FLOAT = NUMERIC
DATETIME = DATETIME
DATE = DATE
TIME = TIME
DOUBLE = NUMERIC
from enum import Enum
class OracleDataTypeEnum(Enum):
NUMBER = "DB_TYPE_NUMBER"
VARCHAR2 = "DB_TYPE_VARCHAR"
DATE = "DB_TYPE_DATE"
TIMESTAMP = "DB_TYPE_TIMESTAMP"
from enum import Enum
from sqlalchemy import Integer, String, Float, Date, TIMESTAMP, NUMERIC, CHAR, Text, Boolean, BLOB
class OracleDataTypeORMEnum(Enum):
NUMBER = NUMERIC
VARCHAR2 = String
DATE = Date
TIMESTAMP = TIMESTAMP
from enum import Enum
class PostgresDataTypeEnum(Enum):
VARCHAR = 1043
INTEGER = 23
NUMERIC = 1700
TIMESTAMP = 1114
TEXT = 25
BOOLEAN = 16
BYTEA = 17
CHAR = 18
FLOAT = 701
DATE = 1082
INTEGER8 = 20
from enum import Enum
from sqlalchemy import Integer, String, Float, Date, TIMESTAMP, NUMERIC, CHAR, Text, Boolean, BLOB
class PostgresDataTypeORMEnum(Enum):
VARCHAR = String
INTEGER = Integer
NUMERIC = NUMERIC
TIMESTAMP = TIMESTAMP
TEXT = Text
BOOLEAN = Boolean
BYTEA = BLOB
CHAR = CHAR
FLOAT = NUMERIC
DATE = Date
INTEGER8 = Integer
from sqlalchemy import create_engine
from enums.DatabaseDialectEnum import DatabaseDialectEnum
import logging
logger = logging.getLogger()
class Mariadb:
def __init__(self, host: str, port: int, user: str, password: str, database: str) -> None:
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.engine = None
def create_engine(self) -> None:
try:
dialect = DatabaseDialectEnum.MARIADB.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4"
self.engine = create_engine(url)
except Exception as e:
logger.error(f"Error creando engine de Mariadb. {e}")
from typing import List, Tuple
from sqlalchemy import create_engine
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from components.Model.InsumoModel import InsumoModel
from components.Databases.Enums.MysqlDataTypeEnum import MysqlDataTypeEnum
from components.Databases.Enums.MysqlDataTypeORMEnum import MysqlDataTypeORMEnum
from sqlalchemy import Table, Column, MetaData
import logging
logger = logging.getLogger()
class Mysql:
def __init__(self, host: str, port: int, user: str, password: str, database: str) -> None:
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.engine = None
def create_engine(self) -> None:
try:
dialect = DatabaseDialectEnum.MYSQL.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4"
self.engine = create_engine(url)
except Exception as e:
logger.error(f"Error creando engine de Mysql. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel") -> bool:
model = None
try:
model = type(modelName, (InsumoModel,), {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
for field in fields:
logger.debug(f"Attribute: {field}")
name = field[0]
if field[2] != -1:
size = int(field[2] / 4)
try:
if not isinstance(field[3], type(None)) and field[3] > 0:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value(precision=field[2], scale=field[3])
else:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value(size)
except TypeError:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value()
else:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value()
setattr(model, name, Column(data_type))
model = model.__table__
except Exception as e:
logger.error(f"Error creando modelo dinámico en Mysql con nombre {tablename}. {e}")
finally:
return model
from typing import List, Tuple
from sqlalchemy import create_engine
import oracledb
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from components.Databases.Enums.OracleDataTypeEnum import OracleDataTypeEnum
from components.Databases.Enums.OracleDataTypeORMEnum import OracleDataTypeORMEnum
from components.Model.InsumoModel import InsumoModel
from sqlalchemy import Column
import logging
logger = logging.getLogger()
class Oracle:
def __init__(self, host: str, port: int, user: str, password: str, service: str) -> None:
self.host = host
self.port = port
self.user = user
self.password = password
self.service = service
self.arraysize = 1000
self.engine = None
self.connection = None
def get_basic_connection(self):
try:
if isinstance(self.connection, type(None)):
self.connection = oracledb.connect(user=self.user, password=self.password, host=self.host,
port=self.port, service_name=self.service)
except Exception as e:
logger.error(f"Error obteniendo conexion básica de Oracle. {e}")
finally:
return self.connection
def create_engine(self) -> None:
try:
dialect = DatabaseDialectEnum.ORACLE.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}?service_name={self.service}"
self.engine = create_engine(url, arraysize=self.arraysize)
except Exception as e:
logger.error(f"Error creando engine de Oracle. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel") -> bool:
model = None
try:
model = type(modelName, (InsumoModel,), {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
for field in fields:
logger.debug(f"Attribute: {field}")
name = field[0]
# Default precision for Integer Oracle : 38
if not isinstance(field[2], type(None)):
size = int(field[2] / 4)
try:
if not isinstance(field[3], type(None)) and field[3] > 0:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(precision=field[3], scale=field[3])
elif not isinstance(field[3], type(None)):
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(precision=38, scale=field[3])
else:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(size)
except TypeError:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value()
elif not isinstance(field[3], type(None)) and field[3] < 0:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(precision=38, scale=16)
elif not isinstance(field[3], type(None)) and field[3] >= 0:
try:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(precision=38)
except TypeError:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value()
else:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value()
setattr(model, name, Column(data_type))
model = model.__table__
except Exception as e:
logger.error(f"Error creando modelo dinámico en Oracle con nombre {tablename}. {type(e)}.{e}")
finally:
return model
from typing import List, Tuple
from sqlalchemy import create_engine
from components.Model.InsumoModel import InsumoModel
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from components.Databases.Enums.PostgresDataTypeEnum import PostgresDataTypeEnum
from components.Databases.Enums.PostgresDataTypeORMEnum import PostgresDataTypeORMEnum
import logging
from sqlalchemy import Table, Column, MetaData
logger = logging.getLogger()
class Postgres:
def __init__(self, host: str, port: int, user: str, password: str, database: str, schema: str) -> None:
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.schema = schema
self.engine = None
self.DEFAULT_VAR_LENGHT = 100
def create_engine(self) -> None:
try:
dialect = DatabaseDialectEnum.POSTGRES.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}" \
f"?options=-csearch_path={self.schema}"
self.engine = create_engine(url)
except Exception as e:
logger.error(f"Error creando engine de Postgres. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel"):
model = None
try:
model = type(modelName, (InsumoModel,), {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
for field in fields:
logger.debug(f"Attribute: {field}")
name = field[0]
if field[2] != -1:
try:
if not isinstance(field[3], type(None)) and field[3] > 0:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(precision=field[2], scale=field[3])
else:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(field[2])
except TypeError:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value()
else:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(self.DEFAULT_VAR_LENGHT)
setattr(model, name, Column(data_type))
model = model.__table__
except Exception as e:
logger.error(f"Error creando modelo dinámico en Postgres con nombre {tablename}. {type(e)}. {e}")
finally:
return model
from typing import Any, Dict
import json
import pandas as pd
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.Utils import select_multiple, generateModel
from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps
from components.DatabaseOperation.DatabaseLoad import save_from_dataframe
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_obj_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from enums.OperationTypeEnum import OperationTypeEnum
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
import logging
logger = logging.getLogger()
def validate_extractor(control_params: Dict[str, Any], timezone: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="CONTROL-CONFIG")
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
for success_task in success_tasks:
task = ti.xcom_pull(task_ids="EXTRACTORS", key=success_task)[0]
final_dict.update({success_task: task})
if not isinstance(failed_tasks, type(None)) and len(failed_tasks) > 0:
status = ProcessStatusEnum.FAIL.value
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="EXTRACTORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
prefix = control_params["prefix"]
if not prefix.endswith("/"):
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
raise AirflowSkipException(f"Ocurrieron errores en la etapa de extracción")
elif status == ProcessStatusEnum.SUCCESS.value:
ti.xcom_push(key="CONTROL-CONFIG", value=conf)
def on_failure_extractor(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('SELECTS', default_var=[], deserialize_json=True)
command = selects[ti.map_index]
tablename = select_multiple(command[1])["tablename"]
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
task_result = {"description": tablename, "status": status, "message": exception}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="FAILED_TASKS", value=task_name)
def on_success_extractor(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('SELECTS', default_var=[], deserialize_json=True)
command = selects[ti.map_index]
tablename = select_multiple(command[1])["tablename"]
status = ProcessStatusEnum.SUCCESS.value
task_result = {"description": tablename, "status": status, "message": ""}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def extract_from_source(command: str, source_conn, intern_conn, chunksize: int, **kwargs):
extract_type = command[0]
command = command[1]
source_engine = source_conn.engine
command_for_create = command
is_tablename = False
columns_name = []
if extract_type.startswith(OperationTypeEnum.PROCEDURE.value):
# Create the model with the procedure descriptor
if command.replace(" ", "").lower().find("|begin") != -1:
tablename = command[:command.find("|")]
else:
proc_name = command[len("begin"):command.rfind("end")]
tablename = proc_name.strip().replace(";", "")
task = kwargs['ti']
procedures = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="PROCEDURE-JSON")
model = None
for procedure in procedures:
if procedure["procedure_identifier"] != tablename:
continue
model = generateModel(tablename, procedure["fields"])
columns_name = [field["identifier"] for field in procedure["fields"]]
is_tablename = True
if isinstance(model, type(None)):
raise AssertionError("Descripción del procedure en el json descriptor no encontraddo")
else:
if source_conn.db_type == DatabaseTypeEnum.ORACLE.value and \
not extract_type.startswith(OperationTypeEnum.PROCEDURE.value):
command_for_create = f"SELECT * FROM ({command_for_create}) OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY"
else:
command_words = command_for_create.split(" ")
if command_words[-2].lower() != "limit":
command_for_create += " limit 1"
columns = []
with source_engine.connect() as connection:
final_command = command_for_create
if final_command.replace(" ", "").lower().find("|select") != -1:
final_command = final_command[final_command.find("select"):]
result = connection.execute(final_command)
fields = result.cursor.description
for field in fields:
name, type_code, length, presicion = field[0], field[1], field[3], field[5]
columns.append((name, type_code, length, presicion))
logger.debug(f"Columnas procesadas: {columns}")
multiple = select_multiple(command)
tablename = multiple["tablename"]
model = source_conn.create_model(tablename, columns)
try:
create = intern_conn.create_table(model)
if create:
logger.info(f"Creado correctamente la tabla {tablename}. Creado?: {create}")
else:
raise AssertionError(f"Error creando tabla {tablename}")
if is_tablename:
command = command[len(tablename+":"):]
temp_connection = source_conn.get_basic_connection()
cursor = temp_connection.cursor()
cursor.execute(command)
for resultSet in cursor.getimplicitresults():
data = []
for row in resultSet:
data.append(row)
if len(data) == chunksize:
dataframe = pd.DataFrame(data, columns=columns_name)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
if save:
logger.debug(f"Guardado correctamente dataframe. Procesando más bloques")
data.clear()
if len(data) > 0:
dataframe = pd.DataFrame(data, columns=columns_name)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
if save:
logger.debug(f"Migrado correctamente todos los datos")
data.clear()
logger.info("Guardado correctamente todos los datos")
source_conn.close_basic_connection()
else:
if command.replace(" ", "").lower().find("|select"):
command = command[command.find("select"):]
steps = get_steps(command, chunksize, source_engine)
# Traemos el iterator
iterator = get_iterator(command, chunksize, source_engine)
logger.info(f"Número de pasos para migrar datos: {steps}")
for step in range(steps):
dataframe = next(iterator)
dataframe["INTERN_ID_BCOM"] = None
logger.debug(dataframe)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
if save:
logger.info(f"Guardado correctamente dataframe en el paso {step+1}")
except Exception as e:
delete = delete_table(tablename, intern_conn.engine)
if delete:
logger.info(f"Se borró correctamente la tabla {tablename}")
raise AssertionError(f"Error creando la tabla y migrando datos. {type(e)}. {e}")
@task(task_id="MASTER_EXTRACTOR", trigger_rule='all_success')
def get_select_from_xcom(**kwargs):
task = kwargs['ti']
final_selects = []
conf = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="CONTROL-CONFIG")
tasks = get_tasks_from_control(conf, "extractor")
success_tasks = tasks["tasks"]
success_tasks = [item[1] for item in success_tasks]
logger.info(f"COMANDOS QUE FUERON EXITOSOS: {success_tasks}")
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith(OperationTypeEnum.SELECT.value) and not key.startswith(OperationTypeEnum.PROCEDURE.value):
continue
xcom_selects = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_selects}")
for select in xcom_selects:
tablename = select_multiple(select)["tablename"]
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or tablename not in success_tasks:
final_selects.append((key, select))
logger.info(f"Comandos para la extracción: {final_selects}")
Variable.set(key='SELECTS', value=final_selects, serialize_json=True)
return [[item] for item in final_selects]
def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timezone: str,
control_s3: Dict[str, Any]) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="ExtraccionDeDatos", prefix_group_id=False) as group:
selects = get_select_from_xcom()
tasks = PythonOperator.partial(
task_id="EXTRACTORS",
python_callable=extract_from_source,
op_kwargs={'source_conn': db_source_conn, 'intern_conn': db_intern_conn, 'chunksize': chunksize},
on_failure_callback=on_failure_extractor,
on_success_callback=on_success_extractor
).expand(op_args=selects)
validate_task = PythonOperator(
task_id="VALIDATE_EXTRACTION",
python_callable=validate_extractor,
op_kwargs={'control_params': control_s3, 'timezone': timezone},
trigger_rule='all_done'
)
selects >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de extracción. {e}")
finally:
return group
from typing import Any, Dict
import os
import json
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.S3Route import save_df_to_s3, load_obj_to_s3
from components.Utils import select_multiple, create_temp_file, delete_temp_dir
from components.Control import get_tasks_from_control, update_new_process
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps
from enums.OperationTypeEnum import OperationTypeEnum
import logging
logger = logging.getLogger()
def validate_generate(control_params: Dict[str, Any], timezone: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="GENERATORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_TRANSFORMATION", key="CONTROL-CONFIG")
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
for success_task in success_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=success_task)[0]
final_dict.update({success_task: task})
if not isinstance(failed_tasks, type(None)) and len(failed_tasks) > 0:
status = ProcessStatusEnum.FAIL.value
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, True)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
prefix = control_params["prefix"]
if not prefix.endswith("/"):
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
raise AirflowSkipException(f"Ocurrieron errores en la etapa de generación")
elif status == ProcessStatusEnum.SUCCESS.value:
ti.xcom_push(key="CONTROL-CONFIG", value=conf)
def on_failure_generator(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('GENERATES', default_var=[], deserialize_json=True)
table = selects[ti.map_index]
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
task_result = {"description": table, "status": status, "message": exception}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="FAILED_TASKS", value=task_name)
def on_success_generator(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('GENERATES', default_var=[], deserialize_json=True)
table = selects[ti.map_index]
status = ProcessStatusEnum.SUCCESS.value
task_result = {"description": table, "status": status, "message": ""}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timezone: str, chunksize=10000):
engine = intern_conn.engine
logger.debug(f"COMANDO: {command}")
tablename = select_multiple(command)["tablename"]
logger.info(f"Generando resultado de la tabla {tablename}")
# Creamos el archivo temporal
filename_mask = params["filename_mask"]
file_type = params["file_type"]
pattern = params["datetime_pattern"]
delimiter = params["delimiter"]
tmp_path = params["tmp_path"]
tmp_file = create_temp_file(tmp_path, filename_mask, file_type, tablename, timezone, pattern)
logger.info(tmp_file)
steps = get_steps(tablename, chunksize, engine, True)
iterator = get_iterator(tablename, chunksize, engine)
logger.info(f"Total de pasos para generar archivo resultado: {steps}")
for step in range(steps):
logger.debug(f"STEP: {step}")
header = True if step == 0 else False
try:
dataframe = next(iterator)
dataframe = dataframe.drop("INTERN_ID_BCOM", axis=1, errors='ignore')
logger.debug(dataframe)
dataframe.to_csv(tmp_file, sep=delimiter, index=False, mode='a', header=header)
except StopIteration:
break
bucket = params["s3_params"]["bucket"]
prefix = params["s3_params"]["prefix"]
conn_id = params["s3_params"]["connection_id"]
if not prefix.endswith("/"):
prefix += "/"
file_key = prefix + tmp_file[tmp_file.rfind("/")+1:]
# Se sube el archivo al S3
logger.info(f"Tamaño del archivo a subir: {os.path.getsize(tmp_file)} bytes")
save_df_to_s3(tmp_file, conn_id, bucket, file_key, in_memory=False)
# Se borra el archivo al finalizar el upload
delete_temp_dir(tmp_file)
@task(task_id="MASTER_GENERATION", trigger_rule='none_skipped')
def get_generate_from_xcom(**kwargs):
task = kwargs['ti']
final_outputs = []
conf = task.xcom_pull(task_ids="VALIDATE_TRANSFORMATION", key="CONTROL-CONFIG")
tasks = get_tasks_from_control(conf, "generator")
success_tasks = tasks["tasks"]
success_tasks = [item[1] for item in success_tasks]
logger.info(f"GENERADORES QUE FUERON EXITOSOS (TABLAS): {success_tasks}")
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith(OperationTypeEnum.SELECT.value) and not key.startswith(OperationTypeEnum.PROCEDURE.value):
continue
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo tablas {xcom_outputs}")
for select in xcom_outputs:
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or select not in success_tasks:
final_outputs.append(select)
logger.info(f"Final outputs: {final_outputs}")
Variable.set(key='GENERATES', value=final_outputs, serialize_json=True)
return [[item] for item in final_outputs]
def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_s3: Dict[str, Any],
timezone: str) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="GeneracionyDespliegueDeResultados", prefix_group_id=False) as group:
outputs = get_generate_from_xcom()
tasks = PythonOperator.partial(
task_id="GENERATORS",
python_callable=generate_and_deploy,
on_failure_callback=on_failure_generator,
on_success_callback=on_success_generator,
op_kwargs={'intern_conn': db_intern_conn, 'params': parameters, 'timezone': timezone}
).expand(op_args=outputs)
validate_task = PythonOperator(
task_id="VALIDATE_GENERATOR",
python_callable=validate_generate,
op_kwargs={'control_params': control_s3, 'timezone': timezone},
trigger_rule='none_skipped'
)
outputs >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de generadores. {e}")
finally:
return group
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, BigInteger
Base = declarative_base()
class InsumoModel(Base):
__abstract__ = True
INTERN_ID_BCOM = Column(BigInteger, primary_key=True, autoincrement=True)
import fnmatch import fnmatch
import datetime import datetime
from typing import Any, Dict, Set from typing import Any, Dict, List, Tuple
import json
import pytz import pytz
import re
from io import BytesIO, StringIO from io import BytesIO, StringIO
import pandas as pd import pandas as pd
from components.Utils import get_type_file from components.Utils import get_type_file
from enums.FileTypeEnum import FileTypeEnum from enums.FileTypeEnum import FileTypeEnum
from enums.ScriptFileTypeEnum import ScriptFileTypeEnum
from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import logging import logging
...@@ -16,33 +16,33 @@ import logging ...@@ -16,33 +16,33 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
def get_df_from_s3(conn: str, bucket: str, key: str, period: str, delimiter: str) -> Dict[str, Any]: def get_df_from_s3(conn: str, bucket: str, key: str, delimiter: str, base_date: datetime.date,
interval: str) -> Dict[str, Any]:
response = {'filename': "", 'df': None} response = {'filename': "", 'df': None}
dataframe = None dataframe = None
try: try:
s3_data = get_data_from_s3(conn, bucket, key, period) s3_data = get_data_from_s3(conn, bucket, key, base_date, interval)
logger.info(f"ARCHIVO EXTRAIDO: {s3_data}") logger.info(f"ARCHIVO EXTRAIDO: {s3_data}")
if s3_data["filename"] == "": if s3_data["filename"] == "":
raise Exception(f"No se encontró archivo para el key: {key} y periodo {period}") raise Exception(f"No se encontró archivo para el key: {key} y fecha base {base_date} en intervalo {interval}")
response.update({'filename': s3_data["filename"]}) response.update({'filename': s3_data["filename"]})
file_type = get_type_file(s3_data["filename"]) file_type = get_type_file(s3_data["filename"])
if file_type == FileTypeEnum.EXCEL: if file_type == FileTypeEnum.EXCEL:
dataframe = pd.read_excel(s3_data["data"], engine="openpyxl") dataframe = pd.read_excel(s3_data["data"], engine="openpyxl", dtype='object')
elif file_type == FileTypeEnum.OLD_EXCEL: elif file_type == FileTypeEnum.OLD_EXCEL:
dataframe = pd.read_excel(s3_data["data"], engine="xlrd") dataframe = pd.read_excel(s3_data["data"], engine="xlrd", dtype='object')
elif file_type == FileTypeEnum.TEXT or file_type == FileTypeEnum.CSV: elif file_type == FileTypeEnum.TEXT or file_type == FileTypeEnum.CSV:
str_data = str(s3_data["data"].getvalue(), encoding='UTF-8', errors='ignore') str_data = str(s3_data["data"].getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data) data = StringIO(str_data)
dataframe = pd.read_csv(data, sep=delimiter) dataframe = pd.read_csv(data, sep=delimiter, dtype='object')
response.update({'df': dataframe}) response.update({'df': dataframe})
except Exception as e: except Exception as e:
logger.error(f"Error trayendo y transformando a DataFrame desde S3 con periodo {period}. {e}") logger.error(f"Error trayendo y transformando a DataFrame desde S3. {e}")
return response return response
def get_data_from_s3(conn: str, bucket: str, key: str, period: str) -> Dict[str, Any]: def get_data_from_s3(conn: str, bucket: str, key: str, base_date: datetime.date, interval: str) -> Dict[str, Any]:
result = {'filename': '', 'data': BytesIO()} result = {'filename': '', 'data': BytesIO()}
utc = pytz.UTC
try: try:
if key.rfind("/") != -1: if key.rfind("/") != -1:
prefix = key[:key.rfind("/")+1] prefix = key[:key.rfind("/")+1]
...@@ -50,25 +50,26 @@ def get_data_from_s3(conn: str, bucket: str, key: str, period: str) -> Dict[str, ...@@ -50,25 +50,26 @@ def get_data_from_s3(conn: str, bucket: str, key: str, period: str) -> Dict[str,
prefix = "" prefix = ""
s3_hook = S3Hook(conn) s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix) files = s3_hook.list_keys(bucket, prefix)
last_key = ("", base_date)
# Colocar una fecha muy atrás como base # Colocar una fecha muy atrás como base
last_key = ("", datetime.datetime(2000, 1, 1, 0, 0, 0).replace(tzinfo=utc))
for file_key in files: for file_key in files:
if fnmatch.fnmatch(file_key, key) and (file_key.find(period) != -1 or file_key.find(period.replace("-", "")) != -1): if fnmatch.fnmatch(file_key, key):
file_date = s3_hook.get_key(file_key, bucket).meta.data file_date = s3_hook.get_key(file_key, bucket).meta.data
file_date = file_date["LastModified"] file_date = file_date["LastModified"]
if last_key[1] >= file_date: if int(interval) - abs((file_date - last_key[1]).days) >= 0:
continue last_key = (file_key, file_date)
last_key = (file_key, file_date)
data = s3_hook.get_key(last_key[0], bucket) data = s3_hook.get_key(last_key[0], bucket)
data.download_fileobj(result["data"]) data.download_fileobj(result["data"])
result["filename"] = last_key[0] result["filename"] = last_key[0]
except Exception as e: except Exception as e:
logger.error(f"Error trayendo datos desde S3 para el key {key} y periodo {period}. {e}") logger.error(f"Error trayendo datos desde S3 para el key {key}. {e}")
return result return result
def search_periods_from_key_s3(conn: str, bucket: str, key: str, pattern: str) -> Set[str]: def get_base_date(conn: str, bucket: str, key: str) -> datetime.date:
periods = set() utc = pytz.UTC
# Colocar una fecha muy atrás como base
last_date = datetime.datetime(2000, 1, 1, 0, 0, 0).replace(tzinfo=utc)
try: try:
if key.rfind("/") != -1: if key.rfind("/") != -1:
prefix = key[:key.rfind("/") + 1] prefix = key[:key.rfind("/") + 1]
...@@ -76,38 +77,46 @@ def search_periods_from_key_s3(conn: str, bucket: str, key: str, pattern: str) - ...@@ -76,38 +77,46 @@ def search_periods_from_key_s3(conn: str, bucket: str, key: str, pattern: str) -
prefix = "" prefix = ""
s3_hook = S3Hook(conn) s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix) files = s3_hook.list_keys(bucket, prefix)
for file in files: for file_key in files:
if not re.search(pattern, file): if fnmatch.fnmatch(file_key, key):
continue file_date = s3_hook.get_key(file_key, bucket).meta.data
period = file[file.rfind(".")-7:file.rfind(".")] file_date = file_date["LastModified"]
if period.find("-") == -1: if last_date >= file_date:
period = period[1:5] + "-" + period[5:] continue
periods.add(period) last_date = file_date
logger.debug(f"Fecha base desde {key} : {last_date}")
except Exception as e: except Exception as e:
logger.error(f"Error buscando periodos disponibles en los archivos. key: {key}. {e}") logger.error(f"Error buscando archivo base para tener la fecha base. key: {key}. {e}")
return set(periods) return last_date
def save_df_to_s3(df: pd.DataFrame, conn: str, bucket: str, key: str, delimiter: str = ","): def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, delimiter: str = ",",
in_memory: bool = True):
try: try:
logger.info(f"SUBIENDO A NUBE KEY {key}") logger.info(f"SUBIENDO A NUBE KEY {key}")
file_type = get_type_file(key) file_type = get_type_file(key)
s3_hook = S3Hook(conn) s3_hook = S3Hook(conn)
if file_type == FileTypeEnum.EXCEL or file_type == FileTypeEnum.OLD_EXCEL: if file_type == FileTypeEnum.EXCEL or file_type == FileTypeEnum.OLD_EXCEL:
with BytesIO() as buffer: if in_memory:
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: with BytesIO() as buffer:
df.to_excel(writer, index=None) with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer:
s3_hook.load_bytes(buffer.getvalue(), key, bucket, True) data.to_excel(writer, index=None)
s3_hook.load_bytes(buffer.getvalue(), key, bucket, True)
else:
pass
elif file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TEXT: elif file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TEXT:
csv_buffer = BytesIO() if in_memory:
df.to_csv(csv_buffer, header=True, index=False, sep=delimiter, na_rep='None') csv_buffer = BytesIO()
csv_buffer.seek(0) data.to_csv(csv_buffer, header=True, index=False, sep=delimiter, na_rep='None')
s3_hook.load_bytes(csv_buffer.getvalue(), key, bucket, True) csv_buffer.seek(0)
s3_hook.load_bytes(csv_buffer.getvalue(), key, bucket, True)
else:
s3_hook.load_file(data, key, bucket)
except Exception as e: except Exception as e:
logger.error(f"Error guardando archivos a S3. key: {key}. {e}") logger.error(f"Error guardando archivos a S3. key: {key}. {e}")
def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str): def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str) -> None:
try: try:
filename = source_key[source_key.rfind("/")+1:] filename = source_key[source_key.rfind("/")+1:]
output_key += filename output_key += filename
...@@ -116,3 +125,51 @@ def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str): ...@@ -116,3 +125,51 @@ def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str):
s3_hook.delete_objects(bucket, source_key) s3_hook.delete_objects(bucket, source_key)
except Exception as e: except Exception as e:
logger.error(f"Error moviendo archivo desde {source_key} hacia {output_key} en bucket {bucket}. {e}") logger.error(f"Error moviendo archivo desde {source_key} hacia {output_key} en bucket {bucket}. {e}")
def get_files_from_prefix(conn: str, bucket: str, prefix: str) -> List[Tuple[str, str]]:
result = []
allowed_filetypes = [ScriptFileTypeEnum[item].value for item in ScriptFileTypeEnum._member_names_]
try:
s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix)
logger.debug(f"Archivos encontrados en el prefijo {prefix}: {files}")
for file in files:
if file.endswith("/") or file[file.rfind(".")+1:].lower() not in allowed_filetypes:
continue
data = s3_hook.get_key(file, bucket).get()['Body'].read().decode("utf-8")
if file.find("/") == -1:
filename = file
else:
filename = file[file.rfind("/")+1:]
script = (filename, data)
result.append(script)
except Exception as e:
logger.error(f"Error extrayendo archivos en memoria desde bucket {bucket} y prefix {prefix}. {e}")
finally:
return result
def get_file_from_key(conn: str, bucket: str, key: str) -> Any:
result = BytesIO()
try:
s3_hook = S3Hook(conn)
data = s3_hook.get_key(key, bucket)
data.download_fileobj(result)
except Exception as e:
result = None
logger.error(f"Error extrayendo archivo {key}. {e}")
finally:
return result
def load_obj_to_s3(obj, conn: str, bucket: str, key: str, replace=True) -> bool:
load = False
try:
s3_hook = S3Hook(conn)
s3_hook.load_bytes(obj, key, bucket, replace)
load = True
except Exception as e:
logger.error(f"Error subiendo archivo de control a bucket {bucket} y key {key}. {e}")
finally:
return load
...@@ -7,6 +7,7 @@ logger = logging.getLogger() ...@@ -7,6 +7,7 @@ logger = logging.getLogger()
POKE_INTERVAL = 5 POKE_INTERVAL = 5
TIMEOUT = 60*1 TIMEOUT = 60*1
VERIFY_SSL = False
def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3KeySensor: def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3KeySensor:
...@@ -18,7 +19,7 @@ def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3 ...@@ -18,7 +19,7 @@ def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3
bucket_name=bucket, bucket_name=bucket,
wildcard_match=True, wildcard_match=True,
aws_conn_id=connection, aws_conn_id=connection,
verify=True, verify=VERIFY_SSL,
poke_interval=POKE_INTERVAL, poke_interval=POKE_INTERVAL,
timeout=TIMEOUT timeout=TIMEOUT
) )
......
import pytz
from datetime import datetime
from dateutil.parser import parse
import logging
logger = logging.getLogger()
def datetime_by_tzone(tzone: str, pattern: str):
offset = None
# Algunos casos donde el timezone es de la forma 4:30 y no se encuentra en timezones de pytz (GMT)
if ":" in tzone:
offset = tzone.split(":")[1]
tzone = tzone.split(":")[0]
if "+" in tzone:
tzone = tzone.replace(tzone[-1], str(int(tzone[-1]) + 1))
timezones_list = pytz.all_timezones
tzones = [x if tzone in x else None for x in timezones_list]
tzones = list(filter(None, tzones))
server_timezone = pytz.timezone(tzones[0])
logger.debug("Zona Horaria : {}".format(server_timezone))
server_time = server_timezone.localize(datetime.utcnow())
current_time = parse(server_time.strftime('%Y-%m-%d %H:%M:%S.%f %Z'))
if offset:
offset = pytz.FixedOffset((current_time.utcoffset().total_seconds() / 60 + float(offset)) * -1)
offset = offset.utcoffset(datetime.utcnow())
current_time = datetime.utcnow() + offset
else:
current_time = current_time.replace(tzinfo=None) - current_time.utcoffset()
current_time = parse(current_time.strftime(pattern))
logger.debug("Hora actual: {}".format(current_time))
return current_time
import json
from typing import Dict, Any
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_obj_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from enums.ProcessStatusEnum import ProcessStatusEnum
from enums.OperationTypeEnum import OperationTypeEnum
import logging
logger = logging.getLogger()
def validate_transform(control_params: Dict[str, Any], timezone: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
for success_task in success_tasks:
task = ti.xcom_pull(task_ids="TRANSFORMATIONS", key=success_task)[0]
final_dict.update({success_task: task})
if not isinstance(failed_tasks, type(None)) and len(failed_tasks) > 0:
status = ProcessStatusEnum.FAIL.value
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="TRANSFORMATIONS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, True)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
prefix = control_params["prefix"]
if not prefix.endswith("/"):
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
raise AirflowSkipException(f"Ocurrieron errores en la etapa de transformación")
elif status == ProcessStatusEnum.SUCCESS.value:
ti.xcom_push(key="CONTROL-CONFIG", value=conf)
def on_failure_transform(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
transform = Variable.get('TRANSFORMS', default_var=[], deserialize_json=True)
script = transform[ti.map_index][0]
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
task_result = {"description": script, "status": status, "message": exception}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="FAILED_TASKS", value=task_name)
def on_success_transform(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
transform = Variable.get('TRANSFORMS', default_var=[], deserialize_json=True)
script = transform[ti.map_index][0]
status = ProcessStatusEnum.SUCCESS.value
task_result = {"description": script, "status": status, "message": ""}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def transformations(xcom_commands: str, intern_conn):
engine = intern_conn.engine
script_name = xcom_commands[0]
commands = xcom_commands[1]
logger.info(f"Ejecutando transformaciones del script {script_name}")
with engine.connect() as connection:
for command in commands:
logger.debug(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
@task(task_id="MASTER_TRANSFORMATION", trigger_rule='none_skipped')
def get_trans_from_xcom(**kwargs):
task = kwargs['ti']
transforms_per_file = []
conf = task.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
tasks = get_tasks_from_control(conf, "transformation")
success_tasks = tasks["tasks"]
success_tasks = [item[1] for item in success_tasks]
logger.info(f"SCRIPTS QUE FUERON EXITOSOS: {success_tasks}")
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
final_transforms = []
if not key.startswith(OperationTypeEnum.TRANSFORM.value):
continue
xcom_transforms = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_transforms}")
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or key not in success_tasks:
for transform in xcom_transforms:
final_transforms.append(transform)
transforms_per_file.append((key, final_transforms))
logger.info(f"Scripts para la transformación: {transforms_per_file}")
Variable.set(key='TRANSFORMS', value=transforms_per_file, serialize_json=True)
return [[item] for item in transforms_per_file]
def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str, Any]) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="TransformacionDeDatos", prefix_group_id=False) as group:
transforms = get_trans_from_xcom()
tasks = PythonOperator.partial(
task_id="TRANSFORMATIONS",
python_callable=transformations,
op_kwargs={'intern_conn': db_intern_conn},
on_failure_callback=on_failure_transform,
on_success_callback=on_success_transform
).expand(op_args=transforms)
validate_task = PythonOperator(
task_id="VALIDATE_TRANSFORMATION",
python_callable=validate_transform,
op_kwargs={'control_params': control_s3, 'timezone': timezone},
trigger_rule='none_skipped'
)
transforms >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de transformación. {e}")
finally:
return group
from typing import List, Any, Dict from typing import List, Any, Dict, Tuple
import uuid
import os
import shutil
import pandas as pd import pandas as pd
from sqlalchemy import Column
from sqlalchemy.exc import InvalidRequestError
from enums.CatalogConfigurationEnum import CatalogConfigurationEnum from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from enums.FileTypeEnum import FileTypeEnum from enums.FileTypeEnum import FileTypeEnum
from enums.DataTypeEnum import DataTypeEnum
from enums.DataTypeOrmEnum import DataTypeOrmEnum
from components.Model.InsumoModel import InsumoModel
from enums.CommentsScriptEnum import CommentsScriptEnum
from components.Timezone import datetime_by_tzone
import logging import logging
...@@ -77,6 +86,156 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c ...@@ -77,6 +86,156 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
if "delimiter" in catalog.keys(): if "delimiter" in catalog.keys():
data_dict.update({catalog_name+'_delimiter': catalog["delimiter"]}) data_dict.update({catalog_name+'_delimiter': catalog["delimiter"]})
except Exception as e: except Exception as e:
logger.error(f"Error actualizando dict de catalogos. {e}") raise AssertionError(f"Error actualizando dict de catalogos. {e}")
finally: finally:
return data_dict return data_dict
def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) -> List[Tuple[str, List[str]]]:
result = []
comments = [CommentsScriptEnum[item].value for item in CommentsScriptEnum._member_names_]
try:
for row in dataset:
data = row[1].split(";")
data = [item.replace("\r", "").replace(";", "").replace("\n", " ") for item in data]
add_next = False
final_data = []
table_name = ""
for item in data:
# if item.strip().startswith("--") and label_tablename.strip()+":" not in item:
# continue
if item.lower().strip() == "end":
final_data[-1] = final_data[-1] + "; end;"
final_item = item
if item.lower().strip().find(label_tablename.lower().strip()+":") != -1:
init_index = item.lower().strip().index(label_tablename.lower().strip()+":")
table_name = item.replace(" ", "").strip()[init_index+5:].strip()
add_next = True
elif item != "":
if add_next:
item = table_name + "|" + item
add_next = False
final_item = item.strip()
table_name = ""
if final_item.strip()[:2] in comments and ("update " in final_item.lower() or "delete " in final_item.lower() or
"alter" in final_item.lower() or "create" in final_item.lower() or
"drop" in final_item.lower()):
trans_index = final_item.lower().find("update")
if trans_index != -1:
final_item = final_item[trans_index:]
delete_index = final_item.lower().find("delete")
if delete_index != -1:
final_item = final_item[delete_index:]
alter_index = final_item.lower().find("alter")
if alter_index != -1:
final_item = final_item[alter_index:]
create_index = final_item.lower().find("create")
if create_index != -1:
final_item = final_item[create_index:]
drop_index = final_item.lower().find("drop")
if drop_index != -1:
final_item = final_item[drop_index:]
final_item = final_item.replace("%", "%%")
final_data.append(final_item)
final_data = [item.replace("\t", "") for item in final_data if item != "" and ("select" in item.lower() or
"update" in item.lower() or
"delete" in item.lower() or
"begin" in item.lower() or
"alter" in item.lower() or
"create" in item.lower() or
"drop" in item.lower() or
"commit" in item.lower())]
result.append((row[0], final_data))
logger.info(f"Lista de comandos: {result}")
except Exception as e:
raise AssertionError(f"Error extrayendo comandos sql. {e}")
finally:
return result
def select_multiple(command: str) -> Dict[str, Any]:
response = {'is_multiple': False, 'tablename': ''}
tablename = ""
no_procedure_init = "|select"
procedure_init = ["|begin"]
try:
if command.lower().replace(" ", "").find(procedure_init[0]) != -1:
response["is_multiple"] = True
tablename = command[:command.index("|")].strip()
response["tablename"] = tablename
else:
if command.lower().replace(" ", "").find(no_procedure_init) != -1:
response["is_multiple"] = True
tablename = command[:command.index("|")].strip()
init_index = command.lower().find("from")
if init_index == -1:
raise AssertionError("Query malformed")
else:
from_command = command[init_index+4:]
tablename_base = from_command.strip().split(" ")
if len(tablename_base) > 0 and tablename == "":
tablename = tablename_base[0]
response["tablename"] = tablename
except Exception as e:
raise AssertionError(f"Error validando si es múltiple select y nombre de tabla. {e}")
finally:
return response
def create_temp_file(tmp_path: str, filename_mask: str, file_type: str, tablename: str, timezone: str,
pattern: str) -> str:
""" Create an output result as a file with a mask in the name
"""
fullpath = ""
try:
dir_name = str(uuid.uuid4())
if not tmp_path.endswith("/"):
tmp_path += "/"
path_dir = tmp_path + dir_name
os.mkdir(path_dir)
current_datetime = str(datetime_by_tzone(timezone, pattern))
filename_mask = filename_mask.replace("<source_name>", tablename)
filename_mask = filename_mask.replace("<datetime>", current_datetime)
filename = filename_mask + "." + file_type
fullpath = path_dir + "/" + filename
open(fullpath, mode='a').close()
except Exception as e:
logger.error(f"Error creando directorio y archivo temporal.{e}")
finally:
return fullpath
def delete_temp_dir(module_name: str) -> bool:
drop = False
try:
if os.path.exists(module_name):
directory = os.path.dirname(module_name)
shutil.rmtree(directory, ignore_errors=True)
except Exception as e:
raise AssertionError(f"Error borrando modulo temporal. {e}")
finally:
return drop
def generateModel(tablename: str, attributes: List[Dict[str, Any]], modelName: str = "TableModel"):
model = type(modelName, (InsumoModel,), {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
try:
for attribute in attributes:
logger.debug(f"attribute: {attribute}")
if attribute["datatype"] == DataTypeEnum.TEXT.name and "maxLength" in attribute.keys():
setattr(model, attribute["identifier"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(attribute["maxLength"])))
else:
setattr(model, attribute["identifier"],
Column(DataTypeOrmEnum[attribute["datatype"]].value))
model = model.__table__
except InvalidRequestError as e:
logger.debug(f"InvalidRequestError. {e}")
except Exception as e:
logger.error(f"Error creando modelo dinámico. {e}")
finally:
return model
from airflow.utils.db import provide_session
from sqlalchemy import or_
from airflow.models import XCom, TaskInstance, Variable
import logging
from typing import List, Tuple
from enums.OperationTypeEnum import OperationTypeEnum
logger = logging.getLogger()
def save_commands_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_mask: str, transform_mask: str,
procedure_mask: str, order_delimiter: str) -> None:
final_names_xcom = []
try:
for data in dataset:
logger.info(f"Guardando Xcom en llave {data[0]}")
name = data[0]
base_name = name
if order_delimiter == ".":
base_name = base_name[:base_name.rfind(".")]
order = base_name.split(order_delimiter)
if len(order) < 2:
raise AssertionError(f"Script {name} no tiene prefijo de orden. Validar nombre de script")
name += "_" + order[0]
if name.find(extract_mask) != -1:
name = OperationTypeEnum.SELECT.value + "|" + name
elif name.find(transform_mask) != -1:
name = OperationTypeEnum.TRANSFORM.value + "|" + name
elif name.find(procedure_mask) != -1:
name = OperationTypeEnum.PROCEDURE.value + "|" + name
task.xcom_push(key=name, value=data[1])
final_names_xcom.append(name)
task.xcom_push(key="XCOM-EXTRACTION-NAMES", value=final_names_xcom)
except Exception as e:
raise AssertionError(f"Error guardando comandos en variables en xcom. {e}")
def save_values_to_xcom(value: str, task, task_name: str, key: str) -> None:
final_names_xcom = []
try:
xcom_names = task.xcom_pull(task_ids=task_name, key=key)
if not xcom_names:
final_names_xcom.append(value)
else:
final_names_xcom = xcom_names
final_names_xcom.append(value)
task.xcom_push(key=key, value=final_names_xcom)
logger.info(f"Guardado correctamente los nombres de las tablas")
except Exception as e:
logger.error(f"Error guardando nombres en variables xcom. {e}")
def delete_all_xcom_tasks() -> None:
try:
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(or_(XCom.task_id == "SCRIPTS-EXTRACTORS", XCom.task_id == "EXTRACTORS",
XCom.task_id == "GENERATORS", XCom.task_id == "TRANSFORMATIONS",
XCom.task_id == "VALIDATE_EXTRACTION", XCom.task_id == "VALIDATE_GENERATOR",
XCom.task_id == "VALIDATE_TRANSFORMATION")).delete()
session.query(Variable).filter(or_(Variable.key == "SELECTS", Variable.key == "TRANSFORMS",
Variable.key == "GENERATES", Variable.key == "CLEANS")).delete()
delete_task_instances()
cleanup_xcom()
except Exception as e:
logger.error(f"Error borrando todas las variables xcom del DAG actual. {e}")
def delete_task_instances() -> None:
try:
@provide_session
def cleanup_taskinstances(session=None):
session.query(TaskInstance).filter(TaskInstance.state == "removed").delete()
cleanup_taskinstances()
except Exception as e:
logger.error(f"Error borrando TaskInstances. {e}")
[{
"date": "2023-07-20 11:15:00",
"status": "success",
"tasks": {
"EXTRACTORS_1": {
"description": "select * from ABC",
"status": "success",
"message": ""
},
"EXTRACTORS_2": {
"description": "select * fromADA",
"status": "success",
"message": ""
},
"TRANSFORMATION_1": {
"description": "1.transformations1.sql",
"status": "success",
"message": ""
},
"GENERATOR_1": {
"description": "ABC_2022-07-20.txt",
"status": "success",
"message": ""
},
"CLEANER_1": {
"description": "ABC",
"status": "success",
"message": ""
}
}
}
]
app:
schedule: "@once"
database:
sources:
source1:
type: mysql
host: 192.168.21.52
port: 13306
username: root
password: root
database: bd_tp_qa
service: ORCLPDB1
schema: public
transformation:
type: mysql
host: 192.168.1.9
port: 13306
username: root
password: root
database: prueba_bcom
service:
schema:
chunksize: 8000
label_multiple_select: TABLE
source_mask: select # Sufijo (S)
procedure_mask: procedure # S
transformation_mask: transform # S
prefix_order_delimiter: .
scripts:
s3_params:
bucket: prueba1234568
prefix: bcom_scripts
connection_id: conn_script
control:
s3_params:
connection_id: conn_script
bucket: prueba1234568
prefix: bcom_control
filename: control_example.json
timezone: 'GMT-5'
outputs:
filename_mask: <source_name>_<datetime>
datetime_pattern: '%Y-%m-%d %H:%M:%S'
file_type: txt
delimiter: '|'
tmp_path: /tmp
s3_params:
bucket: prueba1234568
prefix: bcom_results
connection_id: conn_script
from datetime import datetime
import time
from typing import Any, Dict
import json
from io import StringIO
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from components.Utils import update_sql_commands
from components.Xcom import save_commands_to_xcom, delete_task_instances
from components.S3Route import get_files_from_prefix, get_file_from_key
from components.Sensor import create_s3_sensor
from components.Extractor import get_extract_task_group
from components.Transformation import get_transform_task_group
from components.Generation import get_generate_task_group
from components.Cleaning import get_cleaning_task_group
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.Databases.Database import Database
import logging
logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACIONES3"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition.json"
DEFAULT_ARGS = {
'owner': 'BCOM',
"start_date": datetime(2023, 5, 25, 22, 9),
'depends_on_past': False,
'email': 'caguirre@bytesw.com',
'retries': 0,
'email_on_retry': False,
'email_on_failure': False
}
def cleaning(intern_conn, control_s3: Dict[str, Any]) -> TaskGroup:
groups = None
try:
groups = get_cleaning_task_group(intern_conn, control_s3)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
return groups
def generate_and_deploy_results(intern_conn, parameters: Dict[str, Any], timezone: str,
control_s3: Dict[str, Any]) -> TaskGroup:
groups = None
try:
groups = get_generate_task_group(intern_conn, parameters, control_s3, timezone)
except Exception as e:
logger.error(f"Error general de creación y despliegue de resultados. {e}")
finally:
return groups
def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any]) -> TaskGroup:
groups = None
try:
groups = get_transform_task_group(intern_conn, timezone, control_s3)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
return groups
def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, Any],
chunksize: int = 100000) -> TaskGroup:
groups = None
try:
groups = get_extract_task_group(source_conn, intern_conn, chunksize, timezone, control_s3)
except Exception as e:
logger.error(f"Error general de extracción de datos. {e}")
finally:
return groups
def save_procedure_json(json_path: str, task) -> None:
try:
with open(json_path) as f:
data = json.load(f)
logger.info(f"JSON-PROCEDURE {data}")
if data:
task.xcom_push(key="PROCEDURE-JSON", value=data)
except Exception as e:
logger.error(f"Error leyendo y guardando archivo descriptor de procedure. {e}")
def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task):
try:
if not prefix.endswith("/"):
prefix += "/"
key = prefix + filename
logger.info(f"EXTRAYENDO ARCHIVO DE CONTROL DESDE {key}")
control = get_file_from_key(conn_id, bucket, key)
if control:
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data)
data = json.load(data)
else:
logger.info(f"Json Procedure descargado: {control}")
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": []}]
task.xcom_push(key="CONTROL-CONFIG", value=data)
except Exception as e:
logger.error(f"Error general de descarga de archivo de control. {e}")
def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str,
order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any],
**kwargs):
try:
extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"],
control_params["filename"], kwargs['ti'])
save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti'])
start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
scripts = get_files_from_prefix(conn_id, bucket, prefix)
scripts = update_sql_commands(scripts, label_tablename)
save_commands_to_xcom(scripts, kwargs['ti'], source_mask, transform_mask, procedure_mask, order_delimiter)
logger.debug(f"Script cargados en Xcom: {scripts}")
logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos")
except Exception as e:
raise AssertionError(f"Error general de descarga y guardado de scripts en variables Xcom. {e}")
def set_dag():
""" DAG that execute a series of SQL commands from scripts to make a result file and save on a bucket (for now)"""
import yaml
from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que extrae y transforma",
schedule_interval=conf["schedule"], tags=["DAG BCOM - SQL TRANSFORMATIONS"], catchup=True) as dag:
scripts_s3 = conf["scripts"]["s3_params"]
if scripts_s3["prefix"].endswith("/"):
wildcard_scripts = scripts_s3["prefix"] + "?*"
else:
wildcard_scripts = scripts_s3["prefix"] + "/?*"
sensor_scripts = create_s3_sensor("SCRIPTS-SENSOR", scripts_s3["connection_id"], scripts_s3["bucket"],
wildcard_scripts)
control_s3 = conf["control"]["s3_params"]
# Scripts extraction
extract_mask = conf["source_mask"]
transform_mask = conf["transformation_mask"]
procedure_mask = conf["procedure_mask"]
order_delimiter = conf["prefix_order_delimiter"]
script_extractor = PythonOperator(
task_id="SCRIPTS-EXTRACTOR",
python_callable=extract_scripts,
op_kwargs={'conn_id': scripts_s3["connection_id"], 'bucket': scripts_s3["bucket"],
'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask,
'procedure_mask': procedure_mask, 'order_delimiter': order_delimiter,
'label_tablename': conf["label_multiple_select"], 'control_params': control_s3},
trigger_rule="all_success"
)
# Source Database configuration. Only we use 1 source
source_params = conf["database"]["sources"]["source1"]
source_db = Database(source_params["type"], source_params["host"], int(source_params["port"]),
source_params["username"], source_params["password"], source_params["database"],
source_params["service"], source_params["schema"])
source_db.create_engine()
# Intern Database configuration.
intern_params = conf["database"]["transformation"]
intern_db = Database(intern_params["type"], intern_params["host"], int(intern_params["port"]),
intern_params["username"], intern_params["password"], intern_params["database"],
intern_params["service"], intern_params["schema"])
intern_db.create_engine()
# Creación de grupo de tasks para las extracciones
chunksize = conf["chunksize"]
timezone = conf["timezone"]
extractions = extraction(source_db, intern_db, timezone, control_s3, chunksize)
# Creación de grupo de tasks para las transformaciones
transformations = transformation(intern_db, timezone, control_s3)
# Creación de grupo de tasks para la generación y despliegue de archivos resultados
outputs_conf = conf["outputs"]
result = generate_and_deploy_results(intern_db, outputs_conf, timezone, control_s3)
# Creación de tasks de limpiadores
cleaners = cleaning(intern_db, control_s3)
sensor_scripts >> script_extractor >> extractions >> transformations >> result >> cleaners
return dag
globals()["0"] = set_dag()
...@@ -5,11 +5,9 @@ import time ...@@ -5,11 +5,9 @@ import time
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from components.S3Route import get_df_from_s3, search_periods_from_key_s3, save_df_to_s3, move_object_s3 from components.S3Route import get_df_from_s3, get_base_date, save_df_to_s3, move_object_s3
from components.Sensor import create_s3_sensor from components.Sensor import create_s3_sensor
from components.Utils import get_modified_prefix, add_period_to_sufix, remove_invalid_rows, remove_fields, \ from components.Utils import get_modified_prefix, remove_invalid_rows, remove_fields, update_dict_with_catalogs
update_dict_with_catalogs
from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from airflow import DAG from airflow import DAG
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
...@@ -19,9 +17,14 @@ import logging ...@@ -19,9 +17,14 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACION_TACOMVENTAS_PROMOCIONESRESIDENCIAL" DAG_NAME = "BCOM_DAG_TRANSFORMACION_TACOMVENTAS_PROMOCIONESRESIDENCIAL"
PROMOCION_3PA2P = "3P a 2P"
PROMOCION_PARRILLA = "Parrilla"
PROMOCION_HBO = "Promocion HBO"
PROMOCION_DEFAULT = "Promocion"
PROMOCIONES_NO_CONSIDERADAS_TV_CANALES = "Adicional|Soundbox|SOUNDBOX"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'airflow', 'owner': 'BCOM',
"start_date": datetime(2023, 5, 25, 22, 9), "start_date": datetime(2023, 5, 25, 22, 9),
'depends_on_past': False, 'depends_on_past': False,
'email': 'caguirre@bytesw.com', 'email': 'caguirre@bytesw.com',
...@@ -57,26 +60,26 @@ def dag1_id2(promo: pd.DataFrame, relation_poid: pd.DataFrame, key_field: str, v ...@@ -57,26 +60,26 @@ def dag1_id2(promo: pd.DataFrame, relation_poid: pd.DataFrame, key_field: str, v
def dag1_id3(tacom: pd.DataFrame, promo: pd.DataFrame) -> pd.DataFrame: def dag1_id3(tacom: pd.DataFrame, promo: pd.DataFrame) -> pd.DataFrame:
result = pd.DataFrame() result = pd.DataFrame()
try: try:
promo["CUENTA"] = promo["CUENTA"].astype(int, errors='ignore')
promo["CD_PAQUETE_PROMO"] = promo["CD_PAQUETE"].astype(int, errors='ignore') promo["CD_PAQUETE_PROMO"] = promo["CD_PAQUETE"].astype(int, errors='ignore')
promo.drop("CD_PAQUETE", axis=1, inplace=True) promo.drop("CD_PAQUETE", axis=1, inplace=True)
promo = promo.drop_duplicates(["CUENTA", "CD_PAQUETE_PROMO"]) promo = promo.drop_duplicates(["CUENTA", "CD_PAQUETE_PROMO"])
result = tacom.merge(promo, how='left', left_on=["CD_CUENTA", "CD_PAQUETE"], right_on=["CUENTA", "CD_PAQUETE_PROMO"]) result = tacom.merge(promo, how='left', left_on=["CD_CUENTA", "CD_PAQUETE"], right_on=["CUENTA", "CD_PAQUETE_PROMO"])
result["CD_PAQUETE"] = result["CD_PAQUETE"].astype(str) result["CD_PAQUETE"] = result["CD_PAQUETE"].astype(int).astype(str)
no_consider = "Adicional|Soundbox|SOUNDBOX" result["PROMOCION"] = np.where((result["CD_PAQUETE_PROMO"].isna()) | (result["CD_PAQUETE_PROMO"] == "None") |
aa = result[result["CD_PAQUETE"].str.len() <= 5] (result["CD_PAQUETE_PROMO"] == "nan"), None,
result["PROMOCION"] = np.where((result["CD_PAQUETE"].isna()) | (result["CD_PAQUETE"] == "None") | np.where((result["CD_PAQUETE"].notna()) & (result["CD_PAQUETE"].str.len() <= 5),
(result["CD_PAQUETE"] == "nan"), None, PROMOCION_3PA2P,
np.where((result["CD_PAQUETE"].notna()) & (result["CD_PAQUETE"].str.len() <= 5), "3P a 2P",
np.where((result["NOMBRE_PRODUCTO"].str.contains("TV", na=False)) & np.where((result["NOMBRE_PRODUCTO"].str.contains("TV", na=False)) &
(~result["NOMBRE_PRODUCTO"].str.contains(no_consider, na=False)), "Parrilla", (~result["NOMBRE_PRODUCTO"].str.contains(PROMOCIONES_NO_CONSIDERADAS_TV_CANALES, na=False)),
np.where((result["NOMBRE_PRODUCTO"].str.contains("CANALES", na=False)) & PROMOCION_PARRILLA,
(~result["NOMBRE_PRODUCTO"].str.contains(no_consider, na=False)), "Parrilla", np.where((result["NOMBRE_PRODUCTO"].str.contains("CANALES", na=False)) &
np.where(result["NOMBRE_PRODUCTO"].str.contains("HBO MAX", na=False), "Promocion HBO", (~result["NOMBRE_PRODUCTO"].str.contains(PROMOCIONES_NO_CONSIDERADAS_TV_CANALES, na=False)),
np.where(result["NOMBRE_PRODUCTO"].str.contains("PAQUETE HBO", na=False), "Promocion HBO", PROMOCION_PARRILLA,
np.where(result["NOMBRE_PRODUCTO"].str.contains("HBO MAX", na=False), PROMOCION_HBO,
np.where(result["NOMBRE_PRODUCTO"].str.contains("PAQUETE HBO", na=False), PROMOCION_HBO,
np.where(result["NOMBRE_PRODUCTO"].str.contains("STAR PREMIUM", na=False), np.where(result["NOMBRE_PRODUCTO"].str.contains("STAR PREMIUM", na=False),
"Promocion STAR PREMIUM", "PROMOCION"))))))) "Promocion STAR PREMIUM", PROMOCION_DEFAULT)))))))
result["CD_PAQUETE_PROMO"] = np.where((result["CD_PAQUETE_PROMO"] == 'nan') | result["CD_PAQUETE_PROMO"] = np.where((result["CD_PAQUETE_PROMO"] == 'nan') |
(result["CD_PAQUETE_PROMO"] == 'None'), None, result["CD_PAQUETE"]) (result["CD_PAQUETE_PROMO"] == 'None'), None, result["CD_PAQUETE"])
...@@ -93,7 +96,7 @@ def dag1_id4(df: pd.DataFrame, df_promo: pd.DataFrame, key_field: str, value_fie ...@@ -93,7 +96,7 @@ def dag1_id4(df: pd.DataFrame, df_promo: pd.DataFrame, key_field: str, value_fie
df_promo[value_field] = df_promo[value_field].astype(str, errors='ignore') df_promo[value_field] = df_promo[value_field].astype(str, errors='ignore')
df = df.merge(df_promo, how='outer', left_on="CD_PAQUETE", right_on=key_field) df = df.merge(df_promo, how='outer', left_on="CD_PAQUETE", right_on=key_field)
df = df.dropna(how='all', subset=["CD_EMPRESA", "CD_FOLIO", "CD_CUENTA"]) df = df.dropna(how='all', subset=["CD_EMPRESA", "CD_FOLIO", "CD_CUENTA"])
df["CD_PAQUETE"] = np.where((df["PROMOCION"] == "3P a 2P") & (df[key_field].notna()), df[value_field], df["CD_PAQUETE"]) df["CD_PAQUETE"] = np.where((df["PROMOCION"] == PROMOCION_3PA2P) & (df[key_field].notna()), df[value_field], df["CD_PAQUETE"])
df = df.drop([key_field, value_field], axis=1) df = df.drop([key_field, value_field], axis=1)
except Exception as e: except Exception as e:
logger.error(f"Error DAG1_ID4. {e}") logger.error(f"Error DAG1_ID4. {e}")
...@@ -112,137 +115,126 @@ def dag1_id5(df: pd.DataFrame, relation: pd.DataFrame, key_field: str, value_fie ...@@ -112,137 +115,126 @@ def dag1_id5(df: pd.DataFrame, relation: pd.DataFrame, key_field: str, value_fie
return df return df
def dag1_id6(df: pd.DataFrame, notpromo: pd.DataFrame, key_field: str) -> pd.DataFrame: def dag1_id6(df: pd.DataFrame) -> pd.DataFrame:
try: try:
df["NRO_PAQUETE"] = 0 df["NRO_PAQUETE"] = df.groupby(["CD_FOLIO", "CD_PAQUETE"]).cumcount() + 1
notpromo = notpromo.astype(str)
not_promo_values = notpromo[key_field].tolist()
df_without_paq = df[~df["CD_PAQUETE"].isin(not_promo_values)]
df_with_paq = df[df["CD_PAQUETE"].isin(not_promo_values)]
df_with_paq["NRO_PAQUETE"] = df_with_paq.groupby(["CD_FOLIO", "CD_PAQUETE"]).cumcount() + 1
df = pd.concat([df_with_paq, df_without_paq], ignore_index=True)
except Exception as e: except Exception as e:
logger.error(f"Error DAG1_ID6. {e}") logger.error(f"Error DAG1_ID6. {e}")
finally: finally:
return df return df
def dag1_id7(df: pd.DataFrame) -> pd.DataFrame: def dag1_id7(df: pd.DataFrame, notpromo: pd.DataFrame, key_field: str) -> pd.DataFrame:
try: try:
df["PROMOCION"] = np.where((df["TP_SERVICIO"] == 2) & (df["NRO_PAQUETE"] > 1), None, df["PROMOCION"]) notpromo = notpromo.astype(str)
not_promo_values = notpromo[key_field].tolist()
df["PROMOCION"] = np.where((df["TP_SERVICIO"] == 2) & (df["NRO_PAQUETE"] > 1) &
(df["CD_PAQUETE"].isin(not_promo_values)), None, df["PROMOCION"])
except Exception as e: except Exception as e:
logger.error(f"Error DAG1_ID7. {e}") logger.error(f"Error DAG1_ID7. {e}")
finally: finally:
return df return df
def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: Dict[str, Any], period_pattern: str, def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: Dict[str, Any],
delimiter: str, outputs: Dict[str, Any], procesed_prefix: str, filters: Dict[str, Any]): delimiter: str, interval: str, outputs: Dict[str, Any], procesed_prefix: str, filters: Dict[str, Any]):
start_time = time.time() start_time = time.time()
logger.debug(f"DICCIONARIO DE CATALOGOS: {catalogs}") logger.debug(f"DICCIONARIO DE CATALOGOS: {catalogs}")
periods = search_periods_from_key_s3(conn, bucket, tacom_key, period_pattern) base_date = get_base_date(conn, bucket, tacom_key)
omited_cols = filters["fields_omited"] omited_cols = filters["fields_omited"]
for period in periods:
logger.debug(period) current_delimiter = delimiter
tacom = get_df_from_s3(conn, bucket, tacom_key, period, delimiter) tacom = get_df_from_s3(conn, bucket, tacom_key, current_delimiter, base_date, interval)
tacom_df = remove_invalid_rows(tacom['df'], filters["tacom_drop_nulls_subset"]) tacom_df = remove_invalid_rows(tacom['df'], filters["tacom_drop_nulls_subset"])
tacom_df = remove_fields(tacom_df, omited_cols) tacom_df = remove_fields(tacom_df, omited_cols)
if isinstance(tacom_df, type(None)): if isinstance(tacom_df, type(None)):
logger.error(f"INSUMO TACOM NO ENCONTRADO PARA EL PERIODO {period}") raise AssertionError(f"INSUMO TACOM NO ENCONTRADO")
continue promo = get_df_from_s3(conn, bucket, promo_key, current_delimiter, base_date, interval)
promo = get_df_from_s3(conn, bucket, promo_key, period, delimiter) promo_df = remove_invalid_rows(promo['df'], filters["promo_drop_nulls_subset"])
promo_df = remove_invalid_rows(promo['df'], filters["promo_drop_nulls_subset"]) promo_df = remove_fields(promo_df, omited_cols)
promo_df = remove_fields(promo_df, omited_cols) if isinstance(promo_df, type(None)):
if isinstance(promo_df, type(None)): raise AssertionError(f"INSUMO PROMOCIONES NO ENCONTRADO")
logger.error(f"INSUMO PROMOCIONES NO ENCONTRADO PARA EL PERIODO {period}")
continue if "catalogo_promociones_delimiter" in catalogs.keys():
current_delimiter = catalogs["catalogo_promociones_delimiter"]
if "catalogo_promociones_delimiter" in catalogs.keys(): promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_catalogo_promociones"], current_delimiter, base_date, interval)
delimiter = catalogs["catalogo_promociones_delimiter"] promo_catalog_df = remove_invalid_rows(promo_catalog['df'], filters["catalog_drop_nulls_subset"])
promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_catalogo_promociones"], period, delimiter) promo_catalog_df = remove_fields(promo_catalog_df, omited_cols)
promo_catalog_df = remove_invalid_rows(promo_catalog['df'], filters["catalog_drop_nulls_subset"]) if isinstance(promo_catalog_df, type(None)):
promo_catalog_df = remove_fields(promo_catalog_df, omited_cols) raise AssertionError(f"INSUMO PROMOCIONES CATALOGO NO ENCONTRADO")
if isinstance(promo_catalog_df, type(None)):
logger.error(f"INSUMO PROMOCIONES CATALOGO NO ENCONTRADO PARA EL PERIODO {period}") if "relacionpoidpaquete_delimiter" in catalogs.keys():
continue current_delimiter = catalogs["relacionpoidpaquete_delimiter"]
relationpoid = get_df_from_s3(conn, bucket, catalogs["s3_relacionpoidpaquete"], current_delimiter, base_date, interval)
if "relacionpoidpaquete_delimiter" in catalogs.keys(): relationpoid_df = remove_invalid_rows(relationpoid['df'], filters["relapoid_drop_nulls_subset"])
delimiter = catalogs["relacionpoidpaquete_delimiter"] relationpoid_df = remove_fields(relationpoid_df, omited_cols)
relationpoid = get_df_from_s3(conn, bucket, catalogs["s3_relacionpoidpaquete"], period, delimiter) if isinstance(relationpoid_df, type(None)):
relationpoid_df = remove_invalid_rows(relationpoid['df'], filters["relapoid_drop_nulls_subset"]) raise AssertionError(f"INSUMO RELACION POID NO ENCONTRADO")
relationpoid_df = remove_fields(relationpoid_df, omited_cols)
if isinstance(relationpoid_df, type(None)): if "relacion3pa2p_delimiter" in catalogs.keys():
logger.error(f"INSUMO RELACION POID NO ENCONTRADO PARA EL PERIODO {period}") current_delimiter = catalogs["relacion3pa2p_delimiter"]
continue relation3a2p = get_df_from_s3(conn, bucket, catalogs["s3_relacion3pa2p"], current_delimiter, base_date, interval)
relation3a2p_df = remove_invalid_rows(relation3a2p['df'], filters["rela3pa2p_drop_nulls_subset"])
if "relacion3pa2p_delimiter" in catalogs.keys(): relation3a2p_df = remove_fields(relation3a2p_df, omited_cols)
delimiter = catalogs["relacion3pa2p_delimiter"] if isinstance(relation3a2p_df, type(None)):
relation3a2p = get_df_from_s3(conn, bucket, catalogs["s3_relacion3pa2p"], period, delimiter) raise AssertionError(f"INSUMO RELACION 3A2P NO ENCONTRADO")
relation3a2p_df = remove_invalid_rows(relation3a2p['df'], filters["rela3pa2p_drop_nulls_subset"])
relation3a2p_df = remove_fields(relation3a2p_df, omited_cols) if "relacion_paquetes_delimiter" in catalogs.keys():
if isinstance(relation3a2p_df, type(None)): current_delimiter = catalogs["relacion_paquetes_delimiter"]
logger.error(f"INSUMO RELACION 3A2P NO ENCONTRADO PARA EL PERIODO {period}") relapaq_catalog = get_df_from_s3(conn, bucket, catalogs["s3_relacion_paquetes"], current_delimiter, base_date, interval)
continue relapaq_df = remove_invalid_rows(relapaq_catalog['df'], filters["relapaqs_drop_nulls_subset"])
relapaq_df = remove_fields(relapaq_df, omited_cols)
if "relacion_paquetes_delimiter" in catalogs.keys(): if isinstance(relapaq_df, type(None)):
delimiter = catalogs["relacion_paquetes_delimiter"] raise AssertionError(f"INSUMO RELACION PAQUETE INICIAL Y FINAL NO ENCONTRADO")
relapaq_catalog = get_df_from_s3(conn, bucket, catalogs["s3_relacion_paquetes"], period, delimiter)
relapaq_df = remove_invalid_rows(relapaq_catalog['df'], filters["relapaqs_drop_nulls_subset"]) if "no_promocion_delimiter" in catalogs.keys():
relapaq_df = remove_fields(relapaq_df, omited_cols) current_delimiter = catalogs["no_promocion_delimiter"]
if isinstance(relapaq_df, type(None)): not_promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_no_promocion"], current_delimiter, base_date, interval)
logger.error(f"INSUMO RELACION PAQUETE INICIAL Y FINAL NO ENCONTRADO PARA EL PERIODO {period}") not_promo_df = remove_invalid_rows(not_promo_catalog['df'], filters["not_promo_drop_nulls_subset"])
continue not_promo_df = remove_fields(not_promo_df, omited_cols)
if isinstance(not_promo_df, type(None)):
if "no_promocion_delimiter" in catalogs.keys(): raise AssertionError(f"INSUMO PAQUETES SIN PROMOCION NO ENCONTRADO")
delimiter = catalogs["no_promocion_delimiter"]
not_promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_no_promocion"], period, delimiter) logger.info(f"EJECUTANDO PROCESO")
not_promo_df = remove_invalid_rows(not_promo_catalog['df'], filters["not_promo_drop_nulls_subset"]) promo_df = dag1_id1(promo_df, promo_catalog_df, catalogs["catalogo_promociones_key"],
not_promo_df = remove_fields(not_promo_df, omited_cols) catalogs["catalogo_promociones_value"])
if isinstance(not_promo_df, type(None)): promo_df = dag1_id2(promo_df, relationpoid_df, catalogs["relacionpoidpaquete_key"],
logger.error(f"INSUMO PAQUETES SIN PROMOCION NO ENCONTRADO PARA EL PERIODO {period}") catalogs["relacionpoidpaquete_value"])
continue result = dag1_id3(tacom_df, promo_df)
result = dag1_id4(result, relation3a2p_df, catalogs["relacion3pa2p_key"], catalogs["relacion3pa2p_value"])
logger.info(f"Ejecutando proceso para el periodo: {period}") result = dag1_id5(result, relapaq_df, catalogs["relacion_paquetes_key"], catalogs["relacion_paquetes_value"])
promo_df = dag1_id1(promo_df, promo_catalog_df, catalogs["catalogo_promociones_key"], result = dag1_id6(result)
catalogs["catalogo_promociones_value"]) result = dag1_id7(result, not_promo_df, catalogs["no_promocion_key"])
promo_df = dag1_id2(promo_df, relationpoid_df, catalogs["relacionpoidpaquete_key"], final_columns = list(tacom_df.columns)
catalogs["relacionpoidpaquete_value"]) result = result[final_columns]
result = dag1_id3(tacom_df, promo_df) result = result.sort_values(by="CD_FOLIO")
result = dag1_id4(result, relation3a2p_df, catalogs["relacion3pa2p_key"], catalogs["relacion3pa2p_value"])
result = dag1_id5(result, relapaq_df, catalogs["relacion_paquetes_key"], catalogs["relacion_paquetes_value"]) # Subir los resultados al S3
result = dag1_id6(result, not_promo_df, catalogs["no_promocion_key"]) save_df_to_s3(result, conn, bucket, outputs["tacom_output_path"], outputs["tacom_delimiter"])
result = dag1_id7(result) promo_df.rename(columns={'CD_PAQUETE_PROMO': 'CD_PAQUETE'}, inplace=True)
final_columns = list(tacom_df.columns) save_df_to_s3(promo_df, conn, bucket, outputs["promo_output_path"], outputs["promo_delimiter"])
result = result[final_columns]
# Mover TODOS LOS INSUMOS USADOS
# Subir los resultados al S3 move_object_s3(conn, bucket, tacom['filename'], procesed_prefix)
tacom_output = add_period_to_sufix(outputs["tacom_output_path"], period) move_object_s3(conn, bucket, promo['filename'], procesed_prefix)
save_df_to_s3(result, conn, bucket, tacom_output, outputs["tacom_delimiter"]) move_object_s3(conn, bucket, promo_catalog['filename'], procesed_prefix)
promo_output = add_period_to_sufix(outputs["promo_output_path"], period) move_object_s3(conn, bucket, relationpoid['filename'], procesed_prefix)
promo_df.rename(columns={'CD_PAQUETE_PROMO': 'CD_PAQUETE'}, inplace=True) move_object_s3(conn, bucket, relation3a2p['filename'], procesed_prefix)
save_df_to_s3(promo_df, conn, bucket, promo_output, outputs["promo_delimiter"]) move_object_s3(conn, bucket, relapaq_catalog['filename'], procesed_prefix)
move_object_s3(conn, bucket, not_promo_catalog['filename'], procesed_prefix)
# Mover TODOS LOS INSUMOS USADOS
move_object_s3(conn, bucket, tacom['filename'], procesed_prefix) logger.info("FINALIZADO PROCESO 1")
move_object_s3(conn, bucket, promo['filename'], procesed_prefix) logger.info(f"--- {time.time() - start_time} seconds --- TIEMPO DE EJECUCIÓN DEL PROCESO 1")
move_object_s3(conn, bucket, promo_catalog['filename'], procesed_prefix)
move_object_s3(conn, bucket, relationpoid['filename'], procesed_prefix)
move_object_s3(conn, bucket, relation3a2p['filename'], procesed_prefix)
move_object_s3(conn, bucket, relapaq_catalog['filename'], procesed_prefix)
move_object_s3(conn, bucket, not_promo_catalog['filename'], procesed_prefix)
logger.info("FINALIZADO PROCESO 1")
logger.info(f"--- {time.time() - start_time} seconds --- TIEMPO DE EJECUCIÓN DEL PROCESO 1")
def set_dag_1(): def set_dag_1():
""" DAG that execute the process with TACOMVENTAS AND PROMOCIONES_RESIDENCIAL""" """ DAG that execute the process with TACOMVENTAS AND PROMOCIONES_RESIDENCIAL inputs"""
import yaml import yaml
from yaml.loader import SafeLoader from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml # Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags # En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/opt/airflow/dags/app_conf.yml" conf_path = "/root/airflow/dags/app_conf.yml"
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader) data = yaml.load(f, Loader=SafeLoader)
general_cnf = data["general"] general_cnf = data["general"]
...@@ -308,7 +300,7 @@ def set_dag_1(): ...@@ -308,7 +300,7 @@ def set_dag_1():
python_callable=etl_dag1, python_callable=etl_dag1,
op_kwargs={'conn': s3_conf["s3_conn_id"], 'bucket': s3_conf["bucket"], op_kwargs={'conn': s3_conf["s3_conn_id"], 'bucket': s3_conf["bucket"],
'tacom_key': s3_tacom, 'promo_key': s3_promo, 'catalogs': catalogs_dict, 'tacom_key': s3_tacom, 'promo_key': s3_promo, 'catalogs': catalogs_dict,
'period_pattern': conf["period_pattern"], 'delimiter': conf["csv_delimiter"], 'delimiter': conf["csv_delimiter"], 'interval': conf["group_input_interval_days"],
'outputs': outputs, 'procesed_prefix': procesed_prefix, 'filters': filters}, 'outputs': outputs, 'procesed_prefix': procesed_prefix, 'filters': filters},
trigger_rule="all_success" trigger_rule="all_success"
) )
......
from enum import Enum
class CommentsScriptEnum(Enum):
DASHES = "--"
NUMERAL = "#"
import enum
import pandas as pd
from pymysql import Timestamp
class DataTypeEnum(enum.Enum):
NUMBER = int
TEXT = str
DECIMAL = float
DATETIME = Timestamp
DATE = pd.Timestamp
import enum
from sqlalchemy import Integer, String, Date, DateTime, DECIMAL
class DataTypeOrmEnum(enum.Enum):
NUMBER = Integer
TEXT = String
DECIMAL = DECIMAL
DATE = Date
DATETIME = DateTime
from enum import Enum
class DatabaseDialectEnum(Enum):
MYSQL = "mysql+pymysql"
MARIADB = "mysql+pymysql"
ORACLE = "oracle"
POSTGRES = "postgresql+psycopg2"
from enum import Enum
class DatabaseTypeEnum(Enum):
MYSQL = "mysql"
MARIADB = "mariadb"
ORACLE = "oracle"
POSTGRES = "postgres"
from enum import Enum
class OperationTypeEnum(Enum):
SELECT = "SELECT"
TRANSFORM = "TRANSFORM"
PROCEDURE = "PROCEDURE"
from enum import Enum
class ProcessStatusEnum(Enum):
SUCCESS = "success"
FAIL = "failed"
from enum import Enum
class ScriptFileTypeEnum(Enum):
TEXT = "txt"
SQL = "sql"
[{
"procedure_identifier": "PROCEDURE_1",
"fields": [
{
"identifier": "CD_EMPRESA",
"datatype": "NUMBER",
"decimal_precision": 0,
"maxLength": null
},
{
"identifier": "CD_FOLIO",
"datatype": "TEXT",
"decimal_precision": null,
"maxLength": 100
},
{
"identifier": "CD_CLIENTE",
"datatype": "TEXT",
"decimal_precision": null,
"maxLength": 50
},
{
"identifier": "FH_CONTRATACION",
"datatype": "DATE",
"pattern": "%d-%m-%y",
"decimal_precision": null,
"maxLength": null
}
]
},
{
"procedure_identifier": "procedure_prueba2",
"fields": [
{
"identifier": "col1",
"datatype": "DATE",
"pattern": "%Y-%m-%d",
"decimal_precision": null,
"maxLength": null
},
{
"identifier": "col2",
"datatype": "TIME",
"pattern": "%H:%M:%S",
"decimal_precision": null,
"maxLength": null
},
{
"identifier": "col3",
"datatype": "DATETIME",
"pattern": "%Y-%m-%d %H:%M:%S",
"decimal_precision": null,
"maxLength": null
}
]
}
]
\ No newline at end of file
apiVersion: v1
kind: Namespace
metadata:
name: bcom-airflow
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: bcom-airflow
namespace: bcom-airflow
---
apiVersion: v1
kind: ConfigMap
metadata:
name: pod-template-config
namespace: bcom-airflow
data:
pod_template.yaml: |
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
namespace: bcom-airflow
spec:
containers:
- args: [ ]
command: [ ]
envFrom:
- configMapRef:
name: airflow-envvars-configmap
env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: dumy-image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- name: dags-host-volume
mountPath: /opt/airflow/dags
- name: logs-persistent-storage
mountPath: /opt/airflow/logs
hostNetwork: false
restartPolicy: OnFailure
securityContext:
runAsUser: 50000
nodeSelector: { }
affinity: { }
tolerations: [ ]
volumes:
- name: dags-host-volume
persistentVolumeClaim:
claimName: airflow-dags-pvc
- name: logs-persistent-storage
persistentVolumeClaim:
claimName: airflow-logs-pvc
---
apiVersion: v1 apiVersion: v1
kind: ConfigMap kind: ConfigMap
metadata: metadata:
name: airflow-envvars-configmap name: airflow-envvars-configmap
namespace: bcom-airflow
data: data:
# The conf below is necessary because of a typo in the config on docker-airflow image: # The conf below is necessary because of a typo in the config on docker-airflow image:
# https://github.com/puckel/docker-airflow/blob/bed777970caa3e555ef618d84be07404438c27e3/config/airflow.cfg#L934 # https://github.com/puckel/docker-airflow/blob/bed777970caa3e555ef618d84be07404438c27e3/config/airflow.cfg#L934
AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION: 'false'
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: '10'
AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE: bcom-airflow
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: '30' AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: '30'
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: America/Lima AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: America/Lima
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}' AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}'
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.1" AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.4"
AIRFLOW__KUBERNETES__DAGS_VOLUME_HOST: /mnt/airflow/dags
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF: airflow-envvars-configmap AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF: airflow-envvars-configmap
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE: /opt/airflow/templates/pod_template.yaml AIRFLOW__KUBERNETES_EXECUTOR__POD_TEMPLATE_FILE: /opt/airflow/templates/pod_template.yaml
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor AIRFLOW__CORE__EXECUTOR: KubernetesExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
...@@ -24,5 +89,7 @@ data: ...@@ -24,5 +89,7 @@ data:
_AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: admin _AIRFLOW_WWW_USER_USERNAME: admin
_AIRFLOW_WWW_USER_PASSWORD: admin _AIRFLOW_WWW_USER_PASSWORD: admin
S3_DAGS_DIR: 's3://prueba1234568/dags'
SYNCHRONYZE_DAG_DIR: '30'
MINIO_SERVER: 'http://192.168.49.2:9000'
MINIO_DAGS_DIR: '/prueba-ca/dags'
\ No newline at end of file
...@@ -2,6 +2,7 @@ kind: ClusterRole ...@@ -2,6 +2,7 @@ kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1
metadata: metadata:
name: pods-permissions name: pods-permissions
namespace: bcom-airflow
rules: rules:
- apiGroups: [""] - apiGroups: [""]
resources: ["pods"] resources: ["pods"]
...@@ -13,10 +14,11 @@ kind: ClusterRoleBinding ...@@ -13,10 +14,11 @@ kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1
metadata: metadata:
name: pods-permissions name: pods-permissions
namespace: bcom-airflow
subjects: subjects:
- kind: ServiceAccount - kind: ServiceAccount
name: default name: default
namespace: default namespace: bcom-airflow
roleRef: roleRef:
kind: ClusterRole kind: ClusterRole
name: pods-permissions name: pods-permissions
......
...@@ -2,6 +2,7 @@ apiVersion: apps/v1 ...@@ -2,6 +2,7 @@ apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: airflow-scheduler name: airflow-scheduler
namespace: bcom-airflow
labels: labels:
app: airflow-k8s app: airflow-k8s
...@@ -20,31 +21,26 @@ spec: ...@@ -20,31 +21,26 @@ spec:
spec: spec:
containers: containers:
- name: airflow-scheduler - name: airflow-scheduler
image: cristianfernando/airflow_custom:0.0.1 image: cristianfernando/airflow_custom:0.0.4
args: ["scheduler"] args: ["scheduler"]
envFrom: envFrom:
- configMapRef: - configMapRef:
name: airflow-envvars-configmap name: airflow-envvars-configmap
resources:
limits:
memory: "512Mi"
# cpu: "100"
volumeMounts: volumeMounts:
- name: dags-host-volume - name: dags-host-volume
mountPath: /opt/airflow/dags mountPath: /opt/airflow/dags
- name: logs-persistent-storage - name: logs-persistent-storage
mountPath: /opt/airflow/logs mountPath: /opt/airflow/logs
- name: pods-templates - name: pods-templates
mountPath: /opt/airflow/templates mountPath: /opt/airflow/templates/pod_template.yaml
subPath: pod_template.yaml
volumes: volumes:
- name: dags-host-volume - name: dags-host-volume
hostPath: persistentVolumeClaim:
path: /opt/airflow/dags/dags/ claimName: airflow-dags-pvc
type: Directory
- name: pods-templates - name: pods-templates
hostPath: configMap:
path: /opt/airflow/templates/ name: pod-template-config
type: Directory
- name: logs-persistent-storage - name: logs-persistent-storage
persistentVolumeClaim: persistentVolumeClaim:
claimName: airflow-logs-pvc claimName: airflow-logs-pvc
apiVersion: v1
kind: Secret
metadata:
name: credentials
namespace: bcom-airflow
type:
Opaque
data:
AWS_ACCESS_KEY: QUtJQVFBQU1YTzNaNEJITktFSUU=
AWS_SECRET_KEY: K01VbW4zRW9pZ1k5M3c1UnhOdG1DY3hWK0Vya1pnRVhxeFVralhVMw==
MINIO_USER: bWluaW9hZG1pbg==
MINIO_PASSWORD: bWluaW9hZG1pbg==
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags-pv
namespace: bcom-airflow
spec:
capacity:
storage: 300Mi
accessModes:
- ReadWriteMany
storageClassName: airflow-dags
nfs:
server: 192.168.1.9
path: "/mnt/nfs_share"
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-postgres-pv
namespace: bcom-airflow
spec:
capacity:
storage: 8000Mi
accessModes:
- ReadWriteMany
storageClassName: airflow-postgres
nfs:
server: 192.168.1.9
path: "/mnt/nfs_postgres"
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs-pv
namespace: bcom-airflow
spec:
capacity:
storage: 4000Mi
accessModes:
- ReadWriteMany
storageClassName: airflow-logs
nfs:
server: 192.168.1.9
path: "/mnt/nfs_logs"
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags-pvc
namespace: bcom-airflow
spec:
accessModes:
- ReadWriteMany
storageClassName: airflow-dags
resources:
requests:
storage: 200Mi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-postgres-pvc
namespace: bcom-airflow
spec:
accessModes:
- ReadWriteMany
storageClassName: airflow-postgres
resources:
requests:
storage: 7500Mi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs-pvc
namespace: bcom-airflow
labels:
app: airflow-k8s
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 3500Mi
storageClassName: airflow-logs
\ No newline at end of file
...@@ -2,6 +2,7 @@ apiVersion: apps/v1 ...@@ -2,6 +2,7 @@ apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: airflow-webserver name: airflow-webserver
namespace: bcom-airflow
labels: labels:
app: airflow-k8s app: airflow-k8s
...@@ -20,17 +21,11 @@ spec: ...@@ -20,17 +21,11 @@ spec:
spec: spec:
containers: containers:
- name: airflow-webserver - name: airflow-webserver
image: cristianfernando/airflow_custom:0.0.1 image: apache/airflow:2.5.3
args: ["webserver"] args: ["webserver"]
envFrom: envFrom:
- configMapRef: - configMapRef:
name: airflow-envvars-configmap name: airflow-envvars-configmap
resources:
limits:
memory: "512Mi"
# cpu: "100"
ports:
- containerPort: 8080
volumeMounts: volumeMounts:
- name: dags-host-volume - name: dags-host-volume
mountPath: /opt/airflow/dags/ mountPath: /opt/airflow/dags/
...@@ -38,9 +33,8 @@ spec: ...@@ -38,9 +33,8 @@ spec:
mountPath: /opt/airflow/logs mountPath: /opt/airflow/logs
volumes: volumes:
- name: dags-host-volume - name: dags-host-volume
hostPath: persistentVolumeClaim:
path: /opt/airflow/dags/dags/ claimName: airflow-dags-pvc
type: Directory
- name: logs-persistent-storage - name: logs-persistent-storage
persistentVolumeClaim: persistentVolumeClaim:
claimName: airflow-logs-pvc claimName: airflow-logs-pvc
...@@ -2,17 +2,18 @@ apiVersion: v1 ...@@ -2,17 +2,18 @@ apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: airflow-webserver name: airflow-webserver
namespace: bcom-airflow
labels: labels:
app: airflow-k8s app: airflow-k8s
spec: spec:
type: NodePort type: NodePort
selector: selector:
app: airflow-webserver app: airflow-webserver
ports: ports:
- name: web - appProtocol: http
name: airflow-webserver
port: 8080
protocol: TCP protocol: TCP
port: 8081
targetPort: 8080 targetPort: 8080
nodePort: 30080
\ No newline at end of file
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs-pvc
labels:
app: airflow-k8s
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 2Gi
storageClassName: standard
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- args: [ ]
command: [ ]
env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
- name: DB_HOST
value: postgres
- name: DB_DATABASE
value: airflow
- name: DB_USER
value: airflow
- name: DB_PASSWORD
value: airflow
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
value: postgresql+psycopg2://airflow:airflow@postgres/airflow
- name: AIRFLOW__LOGGING__LOGGING_LEVEL
value: INFO
image: dumy-image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- name: dags-host-volume
mountPath: /opt/airflow/dags
- name: logs-persistent-storage
mountPath: /opt/airflow/logs
hostNetwork: false
restartPolicy: Never
securityContext:
runAsUser: 50000
nodeSelector: { }
affinity: { }
tolerations: [ ]
volumes:
- name: dags-host-volume
hostPath:
path: /opt/airflow/dags/dags/
type: Directory
- name: logs-persistent-storage
persistentVolumeClaim:
claimName: airflow-logs-pvc
...@@ -2,6 +2,7 @@ apiVersion: apps/v1 ...@@ -2,6 +2,7 @@ apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: postgres name: postgres
namespace: bcom-airflow
spec: spec:
selector: selector:
matchLabels: matchLabels:
...@@ -30,4 +31,11 @@ spec: ...@@ -30,4 +31,11 @@ spec:
- name: POSTGRES_USER - name: POSTGRES_USER
value: airflow value: airflow
- name: POSTGRES_DB - name: POSTGRES_DB
value: airflow value: airflow
\ No newline at end of file volumeMounts:
- name: pgdatavol
mountPath: /var/lib/postgresql/data
volumes:
- name: pgdatavol
persistentVolumeClaim:
claimName: airflow-postgres-pvc
\ No newline at end of file
...@@ -2,10 +2,13 @@ apiVersion: v1 ...@@ -2,10 +2,13 @@ apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: postgres name: postgres
namespace: bcom-airflow
spec: spec:
type: NodePort
selector: selector:
app: postgres app: postgres
ports: ports:
- port: 5432 - port: 5432
targetPort: 5432 targetPort: 5432
nodePort: 30084
apache-airflow[kubernetes]==2.5.3 apache-airflow[kubernetes]==2.5.3
openpyxl==3.1.2 openpyxl==3.1.2
XlsxWriter==3.1.2 XlsxWriter==3.1.2
pymysql==1.1.0
oracledb==1.3.2
kubectl apply -f logs-persistenvolumeclaim.yaml kubectl apply -f airflow-envvars-configmap.yaml
kubectl apply -f airflow-volumes.yaml
kubectl apply -f airflow-rbac.yaml kubectl apply -f airflow-rbac.yaml
kubectl apply -f postgres-deployment.yaml kubectl apply -f postgres-deployment.yaml
kubectl apply -f postgres-service.yaml kubectl apply -f postgres-service.yaml
kubectl apply -f airflow-envvars-configmap.yaml kubectl apply -f airflow-secrets.yaml
kubectl apply -f airflow-webserver-deployment.yaml kubectl apply -f airflow-webserver-deployment.yaml
kubectl apply -f airflow-webserver-service.yaml kubectl apply -f airflow-webserver-service.yaml
kubectl apply -f airflow-scheduler-deployment.yaml kubectl apply -f airflow-scheduler-deployment.yaml
......
kubectl delete -f airflow-rbac.yaml kubectl delete -f airflow-rbac.yaml
kubectl delete -f postgres-service.yaml kubectl delete -f postgres-service.yaml
kubectl delete -f postgres-deployment.yaml kubectl delete -f postgres-deployment.yaml
kubectl delete -f airflow-envvars-configmap.yaml kubectl delete -f airflow-secrets.yaml
kubectl delete -f airflow-webserver-service.yaml kubectl delete -f airflow-webserver-service.yaml
kubectl delete -f airflow-webserver-deployment.yaml kubectl delete -f airflow-webserver-deployment.yaml
kubectl delete -f airflow-scheduler-deployment.yaml kubectl delete -f airflow-scheduler-deployment.yaml
kubectl delete -f logs-persistenvolumeclaim.yaml kubectl delete -f sync-dags-deployment.yaml
kubectl delete -f sync-dags-deployment.yaml kubectl delete -f airflow-volumes.yaml
\ No newline at end of file kubectl delete -f airflow-envvars-configmap.yaml
\ No newline at end of file
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-sync-dags
namespace: bcom-airflow
spec:
selector:
matchLabels:
app: airflow-sync-dags
template:
metadata:
labels:
app: airflow-sync-dags
spec:
containers:
- args:
- while true; aws s3 sync --exact-timestamps --delete ${S3_DAGS_DIR:-s3://prueba1234568/dags} /dags;
do sleep ${SYNCHRONYZE_DAG_DIR:-30}; done;
command:
- /bin/bash
- -c
- --
name: sync-dags-s3
image: amazon/aws-cli:2.1.34
envFrom:
- configMapRef:
name: airflow-envvars-configmap
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
key: AWS_ACCESS_KEY
name: credentials
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
key: AWS_SECRET_KEY
name: credentials
volumeMounts:
- name: dags-host-volume
mountPath: /dags
volumes:
- name: dags-host-volume
persistentVolumeClaim:
claimName: airflow-dags-pvc
...@@ -2,7 +2,7 @@ apiVersion: apps/v1 ...@@ -2,7 +2,7 @@ apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: airflow-sync-dags name: airflow-sync-dags
namespace: bcom-airflow
spec: spec:
selector: selector:
matchLabels: matchLabels:
...@@ -16,23 +16,33 @@ spec: ...@@ -16,23 +16,33 @@ spec:
spec: spec:
containers: containers:
- args: - args:
- while true; aws s3 sync --exact-timestamps --delete 's3://prueba1234568/dags' '/dags'; do sleep 30; done; - mc alias set minio ${MINIO_SERVER:-http://192.168.49.2:9000} ${MINIO_USER:-minioadmin}
${MINIO_PASSWORD:-minioadmin}; while true; mc mirror --remove --overwrite minio${MINIO_DAGS_DIR:-/prueba-ca/dags} /dags;
do sleep ${SYNCHRONYZE_DAG_DIR:-30}; done;
command: command:
- /bin/bash - /bin/bash
- -c - -c
- -- - --
name: sync-dags name: sync-dags-minio
image: amazon/aws-cli:2.1.34 image: minio/mc
envFrom:
- configMapRef:
name: airflow-envvars-configmap
env: env:
- name: AWS_ACCESS_KEY_ID - name: MINIO_USER
value: AKIAQAAMXO3Z4BHNKEIE valueFrom:
- name: AWS_SECRET_ACCESS_KEY secretKeyRef:
value: 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 key: MINIO_USER
name: credentials
- name: MINIO_PASSWORD
valueFrom:
secretKeyRef:
key: MINIO_PASSWORD
name: credentials
volumeMounts: volumeMounts:
- name: dags-host-volume - name: dags-host-volume
mountPath: /dags mountPath: /dags
volumes: volumes:
- name: dags-host-volume - name: dags-host-volume
hostPath: persistentVolumeClaim:
path: /opt/airflow/dags/dags/ claimName: airflow-dags-pvc
type: Directory
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment