Commit 03b3285a authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer-ca' into 'developer'

Add V2 and V3

See merge request !4
parents 7772e148 d85ea56f
......@@ -207,8 +207,8 @@ ellos. Minio reemplazaría AWS S3 dado que ya nos va a proporcionar los buckets.
1.- Descargar el instalador y ejecutarlo con los siguientes comandos (Para Debian/Ubuntu):
```shell
wget https://dl.min.io/server/minio/release/linux-amd64/archive/minio_20230602231726.0.0_amd64.deb -O minio.deb
sudo dpkg -i minio.deb
wget https://dl.min.io/server/minio/release/linux-amd64/archive/minio_20230602231726.0.0_amd64.deb -O minio.deb
sudo dpkg -i minio.deb
```
2.- Vamos a crear una carpeta para que se alojen ahí todos los buckets que en un futuro se creen. Ejecutamos:
......@@ -236,3 +236,228 @@ y listo, tenemos Minio para poder crear buckets y serviría como reemplazo de AW
Referencia: [Instalar Minio Server y Opcionalmente Minio Client](https://www.youtube.com/watch?v=EsJHf5nUYyA&ab_channel=MinIO)
## Desplegar Proceso Airflow en Google Cloud Platform (GCP) - Google Kubernetes Engine (GKE)
Ahora se procede a instalar los componentes de Airflow para despliegue en GKE, para esto es necesario
tener presente los siquientes requisitos:
```text
- Tener instalada en su versión más reciente el gcloud (CLI de GCP) para usar los comandos de gcloud en nuestro terminal.
Referencia: https://cloud.google.com/sdk/docs/install
- Tener un proyecto creado en GCP (Esto desde la consola de GCP)
- Tener instalado "kubectl" para usar los comandos y manipular nuestro clúster de Kubernetes
- Tener conocimientos sobre Kubernetes y los comandos con kubectl
- Tener los permisos necesarios en tu cuenta de GCP para poder realizar los comandos siguientes
(particularmente para los servicios GKE, IAM, Filestore y VMs)
```
1.- Setear nuestro proyecto con gcloud. Por ejemplo de la siguiente manera:
```text
gcloud config set project airflow-gke-338120
Donde "airflow-gke-338120" es el ID del proyecto
```
2.- Crear el clúster en gke por vía de comandos. Por ejemplo:
```text
gcloud container clusters create airflow-cluster \
--machine-type n1-standard-4 \
--num-nodes 1 \
--region "asia-east1-a"
Donde:
- "airflow-cluster" es el nombre del clúster
- "n1-standard-4" es el tipo de instancia que tendrá los nodos del clúster
- "1" será el N° de nodos que tendrá nuestro clúster
- "asia-east1-a" será la región donde crearemos el clúster
```
Hay que tener en cuento lo siguiente:
- Si nos pide autenticación para crear el clúster, lo hacemos, esto mediante nuestro correo electrónico
de gmail o nuestra cuenta en GCP.
- Tener en cuenta los límites o quotas que tenemos para nuestra cuenta y la región donde
estamos desplegando nuestro clúster y demás recursos (Esto si estamos usando GCP con el método de quotas)
3.- Una vez creado el clúster, se procede a configurarlo para que tenga los permisos necesarios y usar
otros recursos de GCP, particularmente el bucket donde se alojan nuestros DAGS.
Este método habilitará "Workload Identity" tanto para el clúster y sus nodos para que el clúster
actúe en nombre de las cuentas de servicios que se generarán y estas cuentas serán las que tengan
los permisos necesarios para utilizar otros recursos GCP. Referencia:
[Habilitar Workload Identity](https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity?hl=es-419&_ga=2.68907346.-259342969.1690208432#console_1)
Para esto usamos los siguientes comandos:
```text
# Activamos la configuración de Workload Identity para nuestro clúster creado. Por ejemplo:
gcloud container clusters update airflow-cluster2 \
--region=asia-east1-a \
--workload-pool=airflow-test-394115.svc.id.goog
* Donde "airflow-cluster2" es nombre de nuestro clúster, "asia-east1-a" región donde desplegamos
y "airflow-test-394115" es el ID del proyecto GCP
# Actualizamos la configuración de nuestro grupo de nodos para que tenga la configuración del clúster
gcloud container node-pools update default-pool \
--cluster=airflow-cluster2 \
--region=asia-east1-a \
--workload-metadata=GKE_METADATA
* Donde "default-pool" es el nombre de nuestro grupo de nodos (default-pool es el nombre por defecto),
"airflow-cluster2" es nombre de nuestro clúster y "asia-east1-a" región donde desplegamos
# Obtenemos las credenciales de nuestro clúster creado y poder usarlo con kubectl en nuestro terminal
gcloud container clusters get-credentials airflow-cluster2 \
--region=asia-east1-a
* Donde "airflow-cluster2" es nombre de nuestro clúster y "asia-east1-a" región donde desplegamos
```
4.- Luego de configurar nuestro clúster, vamos a crear un namespace y cuenta de servicio de la
siguiente manera:
```text
# Creamos un namespace, por ejemplo con nombre "bcom-airflow"
kubectl create namespace bcom-airflow
# Creamos un "service account" con nombre "bcom-airflow"
kubectl create serviceaccount bcom-airflow \
--namespace bcom-airflow
```
5.- Ahora vamos a crear una cuenta de servicio en IAM de GCP con el mismo nombre que creamos anteriormente
nuestro "service account" y darle los permisos necesarios:
```text
# Creamos la cuenta de servicio
gcloud iam service-accounts create bcom-airflow \
--project=airflow-test-394115
* Donde "bcom-airflow" es el nombre de la cuenta de servicio IAM y "airflow-test-394115" es el ID del proyecto GCP
# Agregamos a nuestra cuenta creada, los permisos que necesita con el comando:
gcloud projects add-iam-policy-binding airflow-test-394115 \
--member "serviceAccount:bcom-airflow@airflow-test-394115.iam.gserviceaccount.com" \
--role "roles/owner"
* Donde "airflow-test-394115" es el ID del proyecto GCP, "bcom-airflow" es el nombre de la cuenta
de servicio y "roles/owner" es el rol que tiene nuestra cuenta sobre el proyecto.
# Ahora vamos a asociar esta cuenta a nuestro clúster
gcloud iam service-accounts add-iam-policy-binding bcom-airflow@airflow-test-394115.iam.gserviceaccount.com \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:airflow-test-394115.svc.id.goog[bcom-airflow/bcom-airflow]"
* Donde "bcom-airflow" es el nombre de la cuenta de servicio, "airflow-test-394115" es el ID
de nuestro proyecto, además que los "bcom-airflow" es por [namespace]/[serviceaccount]
```
6.- Actualizamos nuestro "service account" local para que use nuestra cuenta de servicio IAM
```text
kubectl annotate serviceaccount bcom-airflow \
--namespace bcom-airflow \
iam.gke.io/gcp-service-account=bcom-airflow@airflow-test-394115.iam.gserviceaccount.com
* Donde "bcom-airflow" hace referencia al "service account" local, namespace y cuenta de servicio IAM.
Y "airflow-test-394115" es el ID del proyecto GCP
```
Listo! Tenemos nuestro clúster configurado a la espera de las configuraciones de los otros recursos:
Filestore y Buckets.
### Desplegar Servidor NFS con GCP - Filestore
1.- Primero se tiene que crear una instancia NFS en la consola de GCP. Dependiendo de los
requerimientos, se puede configurar el tamaño de disco, tipo de disco (HDD o SSD), entre otros.
Para este caso básico, creamos una instancia básica de tipo HDD, con el mínimo de tamaño que es
de 1Ti, en la región "asia-easat1-a", en la red por default y nombre de archivo compartido "volume1"
Por ejemplo:
![image](readme-images/filestore1.png)
**Tener claro y/o apuntar la "Dirección IP" que se muestra en la imagen**
⚠️ Advertencia: Estos pasos siguientes son para la creación de paths requeridos por el proceso
que esten dentro del servidor NFS.
2.- Ahora vamos a crear una instancia básica (Virtual machine) para poder acceder a nuestro NFS
y crear los paths necesarios que usará nuestro proceso
```text
# Crear nuestra instancia en la región donde esta el NFS:
gcloud compute instances create bastion --zone asia-east1-a
* Donde "bastion" es el nombre de la VM y "asia-east1-a" región donde esta nuestro NFS
# Luego vamos a acceder a la VM con el comando (por SSH):
gcloud compute ssh bastion --zone asia-east1-a
* Donde "bastion" es el nombre de la VM y "asia-east1-a" región donde esta nuestro NFS
```
3.- Ahora que estamos dentro de la VM, ejecutamos los siguientes comandos:
```text
# Actualizamos e instalamos el paquete necesario
- sudo -i
- apt update -y
- apt install nfs-common -y
# Creamos una carpeta donde montaremos nuestro NFS, montamos y nos colocamos ahi
- mkdir /mnt/volume1 (cualquier en vez de volume1)
- mount 10.98.100.98:/volume1 /mnt/volume1 (10.98.100.98 es la Dirección IP) y "volume1"
nombre del archivo compartido
- cd /mnt/volume1
# Una vez que estamos situados dentro de la carpeta compartida, creamos y damos permisos a las siguientes carpetas
- mkdir nfs_share
- mkdir nfs_logs
- mkdir nfs_postgres
- chmod -R 777 nfs_share
- chmod -R 777 nfs_logs
- chmod -R 777 nfs_postgres
```
! Listo, tenemos nuestro NFS con las carpetas listas para que nuestro proceso las tome.
⚠️ Tener en cuenta lo siguiente:
- La dirección IP debe colocarse en el template: "airflow-volumes.yaml" en el parámetro nfs.path
de todos los PersistantVolume - PV (son 3)
### Creación de bucket en GCP - Google Cloud Storage - Buckets
Luego de tener listo lo anterior, se tiene que crear un bucket donde se alojará nuestro
código (los DAGS) para que el proceso pueda actualizarlos en caliente.
1.- Por ejemplo creamos el bucket "prueba-rsync2" y dentro de él una carpeta llamada "carpeta" (Puede
ser cualquier nombre)
2.- Colocamos nuestro código dentro de la carpeta, se verá de la siguiente manera:
![image](readme-images/bucket1.png)
⚠️ Advertencia: Tener en cuenta esta ruta creada, para nuestro ejemplo la ruta creada del bucket
y la carpeta sería: _gs://prueba-rsync2/carpeta_. La cual deberá ser colocada en nuestro
archivo de configuración (configmap) en el parámetro de: **"GCS_DAGS_DIR"**
Listo. Con esto se ha configurado todos los servicios requeridos para levantar nuestro proceso
### Desplegando Airflow con GKE
1.-
......@@ -8,7 +8,7 @@ from airflow.decorators import task
from components.Utils import select_multiple
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.S3Route import load_obj_to_s3
from components.S3Route import load_control_to_s3
from enums.OperationTypeEnum import OperationTypeEnum
import logging
......@@ -16,7 +16,7 @@ import logging
logger = logging.getLogger()
def validate_clean(control_params: Dict[str, Any], **kwargs) -> None:
def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG")
......@@ -27,7 +27,7 @@ def validate_clean(control_params: Dict[str, Any], **kwargs) -> None:
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
loaded = load_control_to_s3(bytes(conf.encode()), conn, bucket, key, provider, timezone)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
......@@ -58,10 +58,14 @@ def get_cleaners_from_xcom(**kwargs):
final_selects.append(select)
logger.info(f"Final selects: {final_selects}")
Variable.set(key='CLEANS', value=final_selects, serialize_json=True)
return [[item] for item in final_selects]
if len(final_selects) > 0:
return [[item] for item in final_selects]
else:
return [[None]]
def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any]) -> TaskGroup or None:
def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider: str,
timezone: str) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="LimpiezaDelProceso", prefix_group_id=False) as group:
......@@ -76,7 +80,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any]) -> TaskG
validate_task = PythonOperator(
task_id="VALIDATE_CLEANER",
python_callable=validate_clean,
op_kwargs={'control_params': control_s3},
op_kwargs={'control_params': control_s3, 'provider': provider, 'timezone': timezone},
trigger_rule='none_skipped'
)
cleaners >> tasks >> validate_task
......
from components.Timezone import datetime_by_tzone
from enums.ProcessStatusEnum import ProcessStatusEnum
import logging
from typing import Dict, Any, List, Tuple
import json
from io import StringIO
from typing import Dict, Any, List
from components.S3Route import get_file_from_prefix
logger = logging.getLogger()
def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[str, Any]:
response = {'status': ProcessStatusEnum.SUCCESS.value, 'tasks': []}
response = {'status': ProcessStatusEnum.SUCCESS.value, 'tasks': [], 'reset': False}
try:
conf = conf[-1]
logger.info(f"Último proceso ejecutado: {conf}")
status = conf["status"]
if "reset_by_user" in conf.keys():
response["reset"] = True
if status == ProcessStatusEnum.FAIL.value:
response["status"] = ProcessStatusEnum.FAIL.value
success_tasks = conf["tasks"]
......@@ -33,18 +38,45 @@ def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[s
def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str, Any],
timezone: str, delete_last: bool = False) -> List[Dict[str, Any]]:
timezone: str, task, delete_last: bool = False, frequency: str = "montly") -> List[Dict[str, Any]]:
try:
print("PREV:", conf)
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
current_period = str(datetime_by_tzone(timezone, format_date))[:7]
processed_period = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="PROCESSED_PERIOD")
if delete_last:
print("CONF-TRANS:", conf)
last_tasks = conf[-1]["tasks"]
tasks.update(last_tasks)
print("LAST:", tasks)
conf.pop(-1)
current_datetime = str(datetime_by_tzone(timezone, '%Y-%m-%d %H:%M:%S'))
new_process = {"date": current_datetime, "status": status, "tasks": tasks}
conf.append(new_process)
if current_period == processed_period:
conf.append(new_process)
else:
conf = [new_process]
print("LAS:", conf)
except Exception as e:
logger.error(f"Error actualizando archivo de control. {e}")
finally:
return conf
def extract_last_control(conn_id: str, bucket: str, prefix: str, provider: str, timezone: str, **kwargs):
try:
task = kwargs['ti']
if not prefix.endswith("/"):
prefix += "/"
logger.info(f"BUSCANDO Y EXTRAYENDO ARCHIVO DE CONTROL DESDE {prefix}")
control, control_key = get_file_from_prefix(conn_id, bucket, prefix, provider, timezone, task)
if control:
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data)
if not data:
raise AssertionError("Archivo de control no tiene un formato correcto")
else:
data = json.load(data)
task.xcom_push(key="CONTROL-CONFIG", value=[control_key, data])
else:
raise AssertionError("Archivo de control no encontrado")
except Exception as e:
logger.error(f"Error general de descarga de archivo de control. {e}")
\ No newline at end of file
......@@ -8,7 +8,7 @@ def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool =
final_steps = 0
try:
if is_tablename:
count_command = f"SELECT COUNT(*) FROM {sql_command}"
count_command = f'SELECT COUNT(*) FROM {sql_command}'
else:
count_command = f"SELECT COUNT(*) FROM ({sql_command}) BCOM"
with connection.connect() as conn:
......
......@@ -18,13 +18,13 @@ def execute_transformations(commands: List[str], engine):
def delete_table(tablename: str, engine) -> bool:
delete = False
try:
command = f"DROP TABLE {tablename}"
command = f'DROP TABLE {tablename}'
start_time = time.time()
with engine.connect() as conn:
try:
_ = conn.execute(command)
except Exception:
logger.error(f"Tabla no encontrada")
except Exception as e:
logger.error(f"Tabla no encontrada. {e}")
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
except Exception as e:
......
......@@ -30,6 +30,7 @@ class Database:
elif db_type == DatabaseTypeEnum.ORACLE.value:
self.factory = Oracle(host, port, user, password, service)
self.engine = None
self.procedure_identifier = "execute"
def get_basic_connection(self):
connection = None
......@@ -74,3 +75,22 @@ class Database:
logger.error(f"Error creando tabla {model}. {e}")
finally:
return save
def generate_sql_procedure(self, command: str) -> str:
response = ""
try:
command = command.lower()
response = self.factory.generate_sql_procedure(command, self.procedure_identifier)
except Exception as e:
logger.error(f"Error generando comando sql para procedure. Comando: {command}. {e}")
finally:
return response
def get_all_tablenames(self) -> List[str]:
tablenames = []
try:
tablenames = self.factory.get_all_tablenames()
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
......@@ -2,6 +2,7 @@ from typing import List, Tuple
import pymysql
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.Model.InsumoModel import InsumoModel, InsumoModelOracle
......@@ -9,7 +10,7 @@ from components.Databases.Enums.MysqlDataTypeEnum import MysqlDataTypeEnum
from components.Databases.Enums.MysqlDataTypeORMEnum import MysqlDataTypeORMEnum
from sqlalchemy import Table, Column, MetaData
from sqlalchemy import Column
import logging
logger = logging.getLogger()
......@@ -80,3 +81,24 @@ class Mysql:
finally:
return model
def generate_sql_procedure(self, command: str, reserved_word: str = "execute") -> str:
response = ""
try:
command = command.replace(reserved_word, "").replace(";", "")
response = f"call {command};"
logger.debug("COMANDO MYSQL:", response)
except Exception as e:
logger.error(f"Error generando comando sql para procedure Mysql. Comando: {command}. {e}")
finally:
return response
def get_all_tablenames(self) -> List[str]:
tablenames = []
try:
command = f"SELECT table_name FROM information_schema.tables WHERE table_schema='{self.database}'"
with self.engine.connect() as conn:
tablenames = conn.execute(command).all()
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
......@@ -86,3 +86,25 @@ class Oracle:
logger.error(f"Error creando modelo dinámico en Oracle con nombre {tablename}. {type(e)}.{e}")
finally:
return model
def generate_sql_procedure(self, command: str, reserved_word: str = "execute") -> str:
response = ""
try:
command = command.replace(reserved_word, "").replace(";", "")
response = f"begin {command}; end;"
logger.debug("COMANDO ORACLE:", response)
except Exception as e:
logger.error(f"Error generando comando sql para procedure Oracle. Comando: {command}. {e}")
finally:
return response
def get_all_tablenames(self) -> List[str]:
tablenames = []
try:
command = f"SELECT table_name FROM all_tables WHERE OWNER='{self.user}'"
with self.engine.connect() as conn:
tablenames = conn.execute(command).all()
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
......@@ -81,3 +81,25 @@ class Postgres:
logger.error(f"Error creando modelo dinámico en Postgres con nombre {tablename}. {type(e)}. {e}")
finally:
return model
def generate_sql_procedure(self, command: str, reserved_word: str = "execute") -> str:
response = ""
try:
command = command.replace(reserved_word, "").replace(";", "")
response = f"call {command};"
logger.debug("COMANDO POSTGRES:", response)
except Exception as e:
logger.error(f"Error generando comando sql para procedure Postgres. Comando: {command}. {e}")
finally:
return response
def get_all_tablenames(self) -> List[str]:
tablenames = []
try:
command = f"SELECT pg_tables.tablename FROM pg_catalog.pg_tables WHERE schemaname='{self.schema}'"
with self.engine.connect() as conn:
tablenames = conn.execute(command).all()
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
......@@ -3,14 +3,13 @@ import json
import numpy as np
import pandas as pd
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.Utils import select_multiple, generateModel
from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps
from components.DatabaseOperation.DatabaseLoad import save_from_dataframe
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_obj_to_s3
from components.S3Route import load_control_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from enums.OperationTypeEnum import OperationTypeEnum
......@@ -24,7 +23,7 @@ import logging
logger = logging.getLogger()
def validate_extractor(control_params: Dict[str, Any], timezone: str, **kwargs) -> None:
def validate_extractor(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="SUCCESS_TASKS")
......@@ -41,7 +40,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, **kwargs)
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="EXTRACTORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone)
conf = update_new_process(conf, status, final_dict, timezone, ti)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -50,7 +49,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, **kwargs)
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
loaded = load_control_to_s3(bytes(conf.encode()), conn, bucket, key, provider, timezone)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
......@@ -84,67 +83,41 @@ def on_success_extractor(context) -> None:
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def extract_from_source(command: str, source_conn, intern_conn, chunksize: int, **kwargs):
extract_type = command[0]
def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwargs):
if isinstance(command, type(None)):
raise AirflowSkipException
task = kwargs['ti']
extract_type = command[0].split("|")[0]
command = command[1]
source_engine = source_conn.engine
command_for_create = command
is_tablename = False
definitions = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="EXTRACTION-DEFINITION-JSON")
logger.info(f"Definiciones desde json: {definitions}")
multiple = select_multiple(command)
tablename = multiple["tablename"]
is_procedure = False
if extract_type == "PROCEDURE":
is_procedure = True
columns_name = []
if extract_type.startswith(OperationTypeEnum.PROCEDURE.value):
# Create the model with the procedure descriptor
if command.replace(" ", "").lower().find("|begin") != -1:
tablename = command[:command.find("|")]
elif command.replace(" ", "").lower().find("|call") != -1:
tablename = command[:command.find("|")]
else:
raise AssertionError("Procedure mal formed")
task = kwargs['ti']
procedures = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="PROCEDURE-JSON")
model = None
for procedure in procedures:
if procedure["procedure_identifier"] != tablename:
continue
model = generateModel(tablename, procedure["fields"])
columns_name = [field["identifier"] for field in procedure["fields"]]
is_tablename = True
if isinstance(model, type(None)):
raise AssertionError("Descripción del procedure en el json descriptor no encontraddo")
else:
if source_conn.db_type == DatabaseTypeEnum.ORACLE.value and \
not extract_type.startswith(OperationTypeEnum.PROCEDURE.value):
values = command_for_create.split("|")
if len(values) > 1:
command_for_create = values[1]
command_for_create = f"SELECT * FROM ({command_for_create}) OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY"
else:
command_words = command_for_create.split(" ")
if command_words[-2].lower() != "limit":
command_for_create += " limit 1"
columns = []
with source_engine.connect() as connection:
final_command = command_for_create
if final_command.replace(" ", "").lower().find("|select") != -1:
final_command = final_command[final_command.lower().find("select"):]
result = connection.execute(final_command)
fields = result.cursor.description
for field in fields:
name, type_code, length, presicion = field[0], field[1], field[3], field[5]
columns.append((name, type_code, length, presicion))
logger.debug(f"Columnas procesadas: {columns}")
multiple = select_multiple(command)
tablename = multiple["tablename"]
model = source_conn.create_model(tablename, columns, intern_conn.db_type)
# Create the model with the procedure descriptor
model = None
for procedure in definitions:
if procedure["identifier"] != tablename:
continue
indexes = []
if "indexes" in procedure.keys():
indexes = procedure["indexes"]
model = generateModel(tablename, procedure["fields"], indexes)
columns_name = [field["name"] for field in procedure["fields"]]
if isinstance(model, type(None)):
raise AssertionError(f"Definición del extracción para {tablename} en el json-descriptor no encontraddo")
try:
create = intern_conn.create_table(model)
if create:
logger.info(f"Creado correctamente la tabla {tablename}. Creado?: {create}")
else:
raise AssertionError(f"Error creando tabla {tablename}")
if is_tablename:
command = command[len(tablename+":"):]
if is_procedure:
command = command[len(tablename+"|"):]
temp_connection = source_conn.get_basic_connection()
if source_conn.db_type == DatabaseTypeEnum.ORACLE.value:
cursor = temp_connection.cursor()
......@@ -224,20 +197,29 @@ def get_select_from_xcom(**kwargs):
logger.info(f"Trayendo comandos {xcom_selects}")
for select in xcom_selects:
tablename = select_multiple(select)["tablename"]
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or tablename not in success_tasks:
final_selects.append((key, select))
if tasks["reset"] or tasks["status"] == ProcessStatusEnum.SUCCESS.value or tablename not in success_tasks:
final_selects.append((key, select, ))
logger.info(f"Comandos para la extracción: {final_selects}")
Variable.set(key='SELECTS', value=final_selects, serialize_json=True)
return [[item] for item in final_selects]
if len(final_selects) > 0:
return [[item] for item in final_selects]
else:
return [[None]]
def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timezone: str,
control_s3: Dict[str, Any]) -> TaskGroup or None:
control_s3: Dict[str, Any], provider: str) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="ExtraccionDeDatos", prefix_group_id=False) as group:
selects = get_select_from_xcom()
validate_task = PythonOperator(
task_id="VALIDATE_EXTRACTION",
python_callable=validate_extractor,
op_kwargs={'control_params': control_s3, 'timezone': timezone, 'provider': provider},
trigger_rule='all_done'
)
tasks = PythonOperator.partial(
task_id="EXTRACTORS",
python_callable=extract_from_source,
......@@ -246,12 +228,6 @@ def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timez
on_success_callback=on_success_extractor
).expand(op_args=selects)
validate_task = PythonOperator(
task_id="VALIDATE_EXTRACTION",
python_callable=validate_extractor,
op_kwargs={'control_params': control_s3, 'timezone': timezone},
trigger_rule='all_done'
)
selects >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de extracción. {e}")
......
......@@ -8,8 +8,7 @@ from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from enums.ProcessStatusEnum import ProcessStatusEnum
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.S3Route import save_df_to_s3, load_obj_to_s3
from components.S3Route import save_df_to_s3, load_control_to_s3
from components.Utils import select_multiple, create_temp_file, delete_temp_dir
from components.Control import get_tasks_from_control, update_new_process
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
......@@ -21,7 +20,7 @@ import logging
logger = logging.getLogger()
def validate_generate(control_params: Dict[str, Any], timezone: str, **kwargs) -> None:
def validate_generate(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
......@@ -38,7 +37,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, **kwargs) -
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, True)
conf = update_new_process(conf, status, final_dict, timezone, ti, True)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -47,7 +46,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, **kwargs) -
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
loaded = load_control_to_s3(bytes(conf.encode()), conn, bucket, key, provider, timezone)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
......@@ -61,6 +60,7 @@ def on_failure_generator(context) -> None:
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('GENERATES', default_var=[], deserialize_json=True)
table = selects[ti.map_index]
table = select_multiple(table)["tablename"]
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
task_result = {"description": table, "status": status, "message": exception}
......@@ -73,13 +73,17 @@ def on_success_generator(context) -> None:
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('GENERATES', default_var=[], deserialize_json=True)
table = selects[ti.map_index]
table = select_multiple(table)["tablename"]
status = ProcessStatusEnum.SUCCESS.value
task_result = {"description": table, "status": status, "message": ""}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timezone: str, chunksize=10000):
def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timezone: str,
provider: str, chunksize=10000):
if isinstance(command, type(None)):
raise AirflowSkipException
engine = intern_conn.engine
logger.debug(f"COMANDO: {command}")
tablename = select_multiple(command)["tablename"]
......@@ -117,7 +121,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
file_key = prefix + tmp_file[tmp_file.rfind("/")+1:]
# Se sube el archivo al S3
logger.info(f"Tamaño del archivo a subir: {os.path.getsize(tmp_file)} bytes")
save_df_to_s3(tmp_file, conn_id, bucket, file_key, in_memory=False)
save_df_to_s3(tmp_file, conn_id, bucket, file_key, provider, in_memory=False)
# Se borra el archivo al finalizar el upload
delete_temp_dir(tmp_file)
......@@ -140,15 +144,18 @@ def get_generate_from_xcom(**kwargs):
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo tablas {xcom_outputs}")
for select in xcom_outputs:
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or select not in success_tasks:
if tasks["reset"] or tasks["status"] == ProcessStatusEnum.SUCCESS.value or select not in success_tasks:
final_outputs.append(select)
logger.info(f"Final outputs: {final_outputs}")
Variable.set(key='GENERATES', value=final_outputs, serialize_json=True)
return [[item] for item in final_outputs]
if len(final_outputs) > 0:
return [[item] for item in final_outputs]
else:
return [[None]]
def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_s3: Dict[str, Any],
timezone: str) -> TaskGroup or None:
timezone: str, provider: str) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="GeneracionyDespliegueDeResultados", prefix_group_id=False) as group:
......@@ -159,13 +166,14 @@ def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_
python_callable=generate_and_deploy,
on_failure_callback=on_failure_generator,
on_success_callback=on_success_generator,
op_kwargs={'intern_conn': db_intern_conn, 'params': parameters, 'timezone': timezone}
op_kwargs={'intern_conn': db_intern_conn, 'params': parameters, 'timezone': timezone,
'provider': provider}
).expand(op_args=outputs)
validate_task = PythonOperator(
task_id="VALIDATE_GENERATOR",
python_callable=validate_generate,
op_kwargs={'control_params': control_s3, 'timezone': timezone},
op_kwargs={'control_params': control_s3, 'timezone': timezone, 'provider': provider},
trigger_rule='none_skipped'
)
outputs >> tasks >> validate_task
......
import fnmatch
import datetime
from typing import Any, Dict, List, Tuple
import json
import pytz
from io import BytesIO, StringIO
import pandas as pd
......@@ -9,8 +8,11 @@ import pandas as pd
from components.Utils import get_type_file
from enums.FileTypeEnum import FileTypeEnum
from enums.ScriptFileTypeEnum import ScriptFileTypeEnum
from enums.ProviderTypeEnum import ProviderTypeEnum
from components.Timezone import datetime_by_tzone
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
import logging
logger = logging.getLogger()
......@@ -90,18 +92,26 @@ def get_base_date(conn: str, bucket: str, key: str) -> datetime.date:
return last_date
def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, delimiter: str = ",",
def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, provider: str, delimiter: str = ",",
in_memory: bool = True):
try:
logger.info(f"SUBIENDO A NUBE KEY {key}")
file_type = get_type_file(key)
s3_hook = S3Hook(conn)
gcp_cloud = False
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
hook = S3Hook(conn)
else:
hook = GoogleCloudStorageHook(conn)
gcp_cloud = True
if file_type == FileTypeEnum.EXCEL or file_type == FileTypeEnum.OLD_EXCEL:
if in_memory:
with BytesIO() as buffer:
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer:
data.to_excel(writer, index=None)
s3_hook.load_bytes(buffer.getvalue(), key, bucket, True)
if gcp_cloud:
hook.upload(bucket, key, data=buffer.getvalue())
else:
hook.load_bytes(buffer.getvalue(), key, bucket, True)
else:
pass
elif file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TEXT:
......@@ -109,9 +119,15 @@ def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, d
csv_buffer = BytesIO()
data.to_csv(csv_buffer, header=True, index=False, sep=delimiter, na_rep='None')
csv_buffer.seek(0)
s3_hook.load_bytes(csv_buffer.getvalue(), key, bucket, True)
if gcp_cloud:
hook.upload(bucket, key, data=csv_buffer.getvalue())
else:
hook.load_bytes(csv_buffer.getvalue(), key, bucket, True)
else:
s3_hook.load_file(data, key, bucket)
if gcp_cloud:
hook.upload(bucket, key, data)
else:
hook.load_file(data, key, bucket)
except Exception as e:
logger.error(f"Error guardando archivos a S3. key: {key}. {e}")
......@@ -127,17 +143,28 @@ def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str) ->
logger.error(f"Error moviendo archivo desde {source_key} hacia {output_key} en bucket {bucket}. {e}")
def get_files_from_prefix(conn: str, bucket: str, prefix: str) -> List[Tuple[str, str]]:
def get_files_from_prefix(conn: str, bucket: str, prefix: str, provider: str) -> List[Tuple[str, str]]:
result = []
allowed_filetypes = [ScriptFileTypeEnum[item].value for item in ScriptFileTypeEnum._member_names_]
try:
s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix)
files = []
s3_hook, gcp_hook, data = None, None, None
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix)
elif provider == ProviderTypeEnum.GOOGLE.value:
gcp_hook = GoogleCloudStorageHook(conn)
if not prefix.endswith("/"):
prefix += "/"
files = gcp_hook.list(bucket, prefix=prefix)
logger.debug(f"Archivos encontrados en el prefijo {prefix}: {files}")
for file in files:
if file.endswith("/") or file[file.rfind(".")+1:].lower() not in allowed_filetypes:
continue
data = s3_hook.get_key(file, bucket).get()['Body'].read().decode("utf-8")
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
data = s3_hook.get_key(file, bucket).get()['Body'].read().decode("utf-8")
elif provider == ProviderTypeEnum.GOOGLE.value:
data = gcp_hook.download(bucket, file).decode("utf-8")
if file.find("/") == -1:
filename = file
else:
......@@ -150,26 +177,94 @@ def get_files_from_prefix(conn: str, bucket: str, prefix: str) -> List[Tuple[str
return result
def get_file_from_key(conn: str, bucket: str, key: str) -> Any:
result = BytesIO()
def get_file_from_prefix(conn: str, bucket: str, key: str, provider: str, timezone: str, task,
frequency: str = "montly") -> Any:
result, key_result = BytesIO(), ''
try:
s3_hook = S3Hook(conn)
data = s3_hook.get_key(key, bucket)
data.download_fileobj(result)
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
period = str(datetime_by_tzone(timezone, format_date))[:7]
logger.info(f"Periodo actual: {period}.")
files, file = [], key
cloud_gcp = False
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, key)
elif provider == ProviderTypeEnum.GOOGLE.value:
cloud_gcp = True
gcp_hook = GoogleCloudStorageHook(conn)
if not key.endswith("/"):
key += "/"
files = gcp_hook.list(bucket, prefix=key)
files_with_period = []
for file in files:
if file.endswith("/"):
continue
file_period = file[file.rfind("_") + 1:file.rfind(".")]
files_with_period.append((file, file_period))
files_with_period.sort(key=lambda x: x[1])
if len(files_with_period) > 0:
file = files_with_period[-1][0]
task.xcom_push(key="PROCESSED_PERIOD", value=files_with_period[-1][1])
logger.info(f"Descargando archivo de control: {file}")
if file != key:
if cloud_gcp:
result = gcp_hook.download(bucket, file)
result = BytesIO(result)
else:
data = s3_hook.get_key(file, bucket)
data.download_fileobj(result)
key_result = file
except Exception as e:
result = None
logger.error(f"Error extrayendo archivo {key}. {e}")
finally:
return result
return result, key_result
def load_obj_to_s3(obj, conn: str, bucket: str, key: str, replace=True) -> bool:
def load_control_to_s3(obj, conn: str, bucket: str, key: str, provider: str, timezone: str,
frequency: str = "montly") -> bool:
load = False
try:
s3_hook = S3Hook(conn)
s3_hook.load_bytes(obj, key, bucket, replace)
load = True
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
period = str(datetime_by_tzone(timezone, format_date))[:7]
key = key.replace("<period>", period)
load = upload_file(obj, conn, bucket, key, provider)
except Exception as e:
logger.error(f"Error subiendo archivo de control a bucket {bucket} y key {key}. {e}")
finally:
return load
def load_report_to_s3(conn: str, bucket: str, key: str, filename: str, provider: str, timezone: str,
pattern: str) -> bool:
load = False
try:
current_datetime = str(datetime_by_tzone(timezone, pattern))
key = key.replace("<datetime>", current_datetime)
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
s3_hook = S3Hook(conn)
s3_hook.load_file(filename, key, bucket, True)
elif provider == ProviderTypeEnum.GOOGLE.value:
gcp_hook = GoogleCloudStorageHook(conn)
gcp_hook.upload(bucket, key, filename)
load = True
except Exception as e:
logger.error(f"Error subiendo reporte a bucket {bucket} y key {key}. {e}")
finally:
return load
def upload_file(obj, connection: str, bucket: str, key: str, provider: str) -> bool:
upload = False
try:
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
s3_hook = S3Hook(connection)
s3_hook.load_bytes(obj, key, bucket, True)
elif provider == ProviderTypeEnum.GOOGLE.value:
gcp_hook = GoogleCloudStorageHook(connection)
gcp_hook.upload(bucket, key, data=obj)
upload = True
except Exception as e:
logger.error(f"Error subiendo archivo a {provider}. bucket: {bucket}, key: {key}. {e}")
finally:
return upload
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.sensors.base import BaseSensorOperator
from enums.ProviderTypeEnum import ProviderTypeEnum
import logging
......@@ -10,19 +14,48 @@ TIMEOUT = 60*1
VERIFY_SSL = False
def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3KeySensor:
s3_sensor = None
def create_sensor(task_id: str, connection: str, bucket: str, key: str, provider: str = "google"):
sensor = None
try:
s3_sensor = S3KeySensor(
task_id=task_id,
bucket_key=key,
bucket_name=bucket,
wildcard_match=True,
aws_conn_id=connection,
verify=VERIFY_SSL,
poke_interval=POKE_INTERVAL,
timeout=TIMEOUT
)
if provider == ProviderTypeEnum.GOOGLE.value:
sensor = GCPSensor(
task_id=task_id,
conn=connection,
bucket=bucket,
key=key,
poke_interval=POKE_INTERVAL,
timeout=TIMEOUT
)
else:
sensor = S3KeySensor(
task_id=task_id,
bucket_key=key,
bucket_name=bucket,
wildcard_match=True,
aws_conn_id=connection,
verify=VERIFY_SSL,
poke_interval=POKE_INTERVAL,
timeout=TIMEOUT
)
except Exception as e:
logger.error(f"Error creando Sensor S3. {e}")
return s3_sensor
return sensor
class GCPSensor(BaseSensorOperator):
def __init__(self, conn: str, bucket: str, key: str, **kwargs) -> None:
self.conn = conn
self.bucket = bucket
self.key = key
super().__init__(**kwargs)
def poke(self, context):
hook = GCSHook(self.conn)
end_prefix_index = self.key.rfind("/")
if end_prefix_index != -1 and len(self.key[end_prefix_index:]) > 1:
self.key = self.key[:end_prefix_index + 1]
files = hook.list(self.bucket, prefix=self.key)
files = list(map(lambda x: not x.endswith("/"), files))
return any([criteria for criteria in files])
......@@ -7,7 +7,7 @@ import logging
logger = logging.getLogger()
def datetime_by_tzone(tzone: str, pattern: str):
def datetime_by_tzone(tzone: str, pattern: str = "%Y-%m-%d"):
offset = None
# Algunos casos donde el timezone es de la forma 4:30 y no se encuentra en timezones de pytz (GMT)
if ":" in tzone:
......
......@@ -7,7 +7,7 @@ from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_obj_to_s3
from components.S3Route import load_control_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from enums.ProcessStatusEnum import ProcessStatusEnum
from enums.OperationTypeEnum import OperationTypeEnum
......@@ -17,7 +17,7 @@ import logging
logger = logging.getLogger()
def validate_transform(control_params: Dict[str, Any], timezone: str, **kwargs) -> None:
def validate_transform(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS")
......@@ -34,7 +34,7 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, **kwargs)
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="TRANSFORMATIONS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, True)
conf = update_new_process(conf, status, final_dict, timezone, ti, True)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -43,7 +43,7 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, **kwargs)
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
loaded = load_control_to_s3(bytes(conf.encode()), conn, bucket, key, provider, timezone)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
......@@ -76,6 +76,8 @@ def on_success_transform(context) -> None:
def transformations(xcom_commands: str, intern_conn):
if isinstance(xcom_commands, type(None)):
raise AirflowSkipException
engine = intern_conn.engine
script_name = xcom_commands[0]
commands = xcom_commands[1]
......@@ -103,16 +105,20 @@ def get_trans_from_xcom(**kwargs):
continue
xcom_transforms = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_transforms}")
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or key not in success_tasks:
if tasks["reset"] or tasks["status"] == ProcessStatusEnum.SUCCESS.value or key not in success_tasks:
for transform in xcom_transforms:
final_transforms.append(transform)
transforms_per_file.append((key, final_transforms))
logger.info(f"Scripts para la transformación: {transforms_per_file}")
Variable.set(key='TRANSFORMS', value=transforms_per_file, serialize_json=True)
return [[item] for item in transforms_per_file]
if len(transforms_per_file) > 0:
return [[item] for item in transforms_per_file]
else:
return [[None]]
def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str, Any]) -> TaskGroup or None:
def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str, Any],
provider: str) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="TransformacionDeDatos", prefix_group_id=False) as group:
......@@ -129,7 +135,7 @@ def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str
validate_task = PythonOperator(
task_id="VALIDATE_TRANSFORMATION",
python_callable=validate_transform,
op_kwargs={'control_params': control_s3, 'timezone': timezone},
op_kwargs={'control_params': control_s3, 'timezone': timezone, 'provider': provider},
trigger_rule='none_skipped'
)
transforms >> tasks >> validate_task
......
......@@ -93,6 +93,7 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) -> List[Tuple[str, List[str]]]:
result = []
allowed_commands = ["create", "update", "delete", "select", "alter", "drop", "begin", "commit"]
comments = [CommentsScriptEnum[item].value for item in CommentsScriptEnum._member_names_]
try:
for row in dataset:
......@@ -102,8 +103,6 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
final_data = []
table_name = ""
for item in data:
# if item.strip().startswith("--") and label_tablename.strip()+":" not in item:
# continue
if item.lower().strip() == "end":
final_data[-1] = final_data[-1] + "; end;"
final_item = item
......@@ -117,43 +116,21 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
add_next = False
final_item = item.strip()
table_name = ""
if final_item.strip()[:2] in comments and (
"update " in final_item.lower() or "delete " in final_item.lower() or
"alter" in final_item.lower() or "create" in final_item.lower() or
"drop" in final_item.lower()):
trans_index = final_item.lower().find("update")
if trans_index != -1:
final_item = final_item[trans_index:]
delete_index = final_item.lower().find("delete")
if delete_index != -1:
final_item = final_item[delete_index:]
alter_index = final_item.lower().find("alter")
if alter_index != -1:
final_item = final_item[alter_index:]
create_index = final_item.lower().find("create")
if create_index != -1:
final_item = final_item[create_index:]
drop_index = final_item.lower().find("drop")
if drop_index != -1:
final_item = final_item[drop_index:]
call_index = final_item.lower().find("call")
if call_index != -1:
final_item = final_item[call_index:]
if any([command in final_item.lower() for command in allowed_commands]):
if final_item.strip()[:2] in comments:
init_indexes = [final_item.lower().find(command) for command in allowed_commands
if final_item.lower().find(command) != -1]
if len(init_indexes) > 0:
init_index = init_indexes[0]
final_item = final_item[init_index:]
final_item = final_item.replace("%", "%%")
final_data.append(final_item)
final_data = [item.replace("\t", "") for item in final_data if item != "" and ("select" in item.lower() or
"update" in item.lower() or
"delete" in item.lower() or
"begin" in item.lower() or
"alter" in item.lower() or
"create" in item.lower() or
"drop" in item.lower() or
"commit" in item.lower() or
"call" in item.lower())]
final_data = [item.replace("\t", "") for item in final_data if item != "" and
all([not item.strip().startswith(comment) for comment in comments])]
result.append((row[0], final_data))
logger.info(f"Lista de comandos: {result}")
except Exception as e:
raise AssertionError(f"Error extrayendo comandos sql. {e}")
logger.error(f"Error extrayendo comandos sql. {e}")
finally:
return result
......@@ -198,7 +175,7 @@ def create_temp_file(tmp_path: str, filename_mask: str, file_type: str, tablenam
if not tmp_path.endswith("/"):
tmp_path += "/"
path_dir = tmp_path + dir_name
os.mkdir(path_dir)
os.makedirs(path_dir)
current_datetime = str(datetime_by_tzone(timezone, pattern))
filename_mask = filename_mask.replace("<source_name>", tablename)
filename_mask = filename_mask.replace("<datetime>", current_datetime)
......@@ -223,21 +200,30 @@ def delete_temp_dir(module_name: str) -> bool:
return drop
def generateModel(tablename: str, attributes: List[Dict[str, Any]], modelName: str = "TableModel"):
def generateModel(tablename: str, attributes: List[Dict[str, Any]], indexes: List[str], modelName: str = "TableModel"):
default_precision = 8
model = type(modelName, (InsumoModel,), {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
try:
for attribute in attributes:
index = False
if attribute["name"] in indexes:
index = True
logger.debug(f"attribute: {attribute}")
if attribute["datatype"] == DataTypeEnum.TEXT.name and "maxLength" in attribute.keys():
setattr(model, attribute["identifier"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(attribute["maxLength"])))
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(attribute["maxLength"]), index=index))
elif attribute["datatype"] == DataTypeEnum.DECIMAL.name:
precision = default_precision
if "decimal_precision" in attribute.keys():
precision = attribute["decimal_precision"]
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(38, precision), index=index))
else:
setattr(model, attribute["identifier"],
Column(DataTypeOrmEnum[attribute["datatype"]].value))
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value, index=index))
model = model.__table__
except InvalidRequestError as e:
logger.debug(f"InvalidRequestError. {e}")
......@@ -245,3 +231,21 @@ def generateModel(tablename: str, attributes: List[Dict[str, Any]], modelName: s
logger.error(f"Error creando modelo dinámico. {e}")
finally:
return model
def delete_temp_dirs(tmp_dir: str) -> bool:
delete = False
try:
dirs = list(os.listdir(tmp_dir))
for directory in dirs:
full_path = tmp_dir + "/" + directory
if os.path.isdir(full_path):
_ = delete_temp_dir(full_path)
logger.debug(f"Se borró el directorio {directory}")
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
delete = True
except Exception as e:
logger.error(f"Error borrando archivos temporales en {tmp_dir}. {e}")
finally:
return delete
......@@ -55,13 +55,14 @@ def delete_all_xcom_tasks() -> None:
try:
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(or_(XCom.task_id == "SCRIPTS-EXTRACTORS", XCom.task_id == "EXTRACTORS",
session.query(XCom).filter(or_(XCom.task_id == "SCRIPTS-EXTRACTOR", XCom.task_id == "EXTRACTORS",
XCom.task_id == "GENERATORS", XCom.task_id == "TRANSFORMATIONS",
XCom.task_id == "VALIDATE_EXTRACTION", XCom.task_id == "VALIDATE_GENERATOR",
XCom.task_id == "VALIDATE_TRANSFORMATION")).delete()
XCom.task_id == "VALIDATE_TRANSFORMATION", XCom.task_id == "CONTROL-EXTRACTOR")).delete()
session.query(Variable).filter(or_(Variable.key == "SELECTS", Variable.key == "TRANSFORMS",
Variable.key == "GENERATES", Variable.key == "CLEANS")).delete()
delete_task_instances()
session.query(XCom).filter(XCom.dag_id == "BCOM_DAG_TRANSFORMACIONES4").delete()
cleanup_xcom()
except Exception as e:
logger.error(f"Error borrando todas las variables xcom del DAG actual. {e}")
......
app:
schedule: "@once"
inform_dag_schedule: "@once"
reset_dag_schedule: "@once"
database:
sources:
source1:
type: mysql
host: 192.168.21.52
port: 13306
username: root
password: root
database: bd_tp_qa
host: database-11.cluster-ro-cvsz4ey9eiec.us-east-1.rds.amazonaws.com
port: 3306
username: admin
password: adminadmin
database: prueba_bcom
service: ORCLPDB1
schema: public
schema: sources
transformation:
type: mysql
host: 192.168.1.9
port: 13306
username: root
password: root
database: prueba_bcom
host: database-11.cluster-ro-cvsz4ey9eiec.us-east-1.rds.amazonaws.com
port: 3306
username: admin
password: adminadmin
database: prueba_bcom2
service:
schema:
schema: intern_db
chunksize: 8000
label_multiple_select: TABLE
source_mask: select # Sufijo (S)
procedure_mask: procedure # S
transformation_mask: transform # S
prefix_order_delimiter: .
cloud_provider: google
scripts:
s3_params:
bucket: prueba1234568
bucket: prueba-airflow3
prefix: bcom_scripts
connection_id: conn_script
control:
s3_params:
connection_id: conn_script
bucket: prueba1234568
bucket: prueba-airflow3
prefix: bcom_control
filename: control_example.json
filename: control_<period>.json
timezone: 'GMT-5'
outputs:
filename_mask: <source_name>_<datetime>
......@@ -45,7 +48,13 @@ app:
delimiter: '|'
tmp_path: /tmp
s3_params:
bucket: prueba1234568
bucket: prueba-airflow3
prefix: bcom_results
connection_id: conn_script
report:
s3_params:
bucket: prueba-airflow3
prefix: bcom_report
connection_id: conn_script
filename: report_<datetime>.xlsx
datetime_pattern: '%Y-%m-%d %H:%M:%S'
from datetime import datetime
from typing import Any, Dict
from airflow import DAG
import pandas as pd
import os
import uuid
from airflow.operators.python import PythonOperator
from components.Control import extract_last_control
from components.S3Route import load_report_to_s3
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.Utils import delete_temp_dir
import logging
logger = logging.getLogger()
DAG_NAME = "INFORM_PROCESS"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
DEFAULT_ARGS = {
'owner': 'BCOM',
"start_date": datetime(2023, 7, 29, 22, 9),
'depends_on_past': False,
'email': 'caguirre@bytesw.com',
'retries': 0,
'email_on_retry': False,
'email_on_failure': False
}
def upload_report(report_params: Dict[str, Any], provider: str, timezone: str, **kwargs) -> None:
try:
task = kwargs["ti"]
report_path = task.xcom_pull(task_ids="CREATE-REPORT", key="REPORT_PATH")
pattern = report_params["datetime_pattern"]
conn = report_params["s3_params"]["connection_id"]
bucket = report_params["s3_params"]["bucket"]
prefix = report_params["s3_params"]["prefix"]
key = report_params["s3_params"]["filename"]
if not prefix.endswith("/"):
key = prefix + "/" + key
else:
key = prefix + key
upload = load_report_to_s3(conn, bucket, key, report_path, provider, timezone, pattern)
if upload:
logger.info(f"Se subio correctamente el reporte a {key}. bucket: {bucket}")
delete_temp_dir(report_path)
except Exception as e:
logger.error(f"Error subiendo reporte a . {e}")
def create_report(tmp_path: str, **kwargs) -> None:
try:
task = kwargs["ti"]
data = task.xcom_pull(task_ids="GET-DATA-REPORT", key="REPORT-DATA")
status, execution_date = data["PROCESS_STATUS"], data["PROCESS_EXECUTION"]
_, _ = data.pop("PROCESS_STATUS"), data.pop("PROCESS_EXECUTION")
dir_name = str(uuid.uuid4())
if not tmp_path.endswith("/"):
tmp_path += "/"
path_dir = tmp_path + dir_name
os.makedirs(path_dir)
excel_tmp_path = path_dir + "/tmp_excel.xlsx"
with pd.ExcelWriter(excel_tmp_path, engine="xlsxwriter") as writer:
workbook = writer.book
worksheet = workbook.add_worksheet("report")
worksheet.set_zoom(90)
title = "Reporte de último proceso ejecutado"
title_format = workbook.add_format()
title_format.set_font_size(20)
title_format.set_font_color("#333333")
header = f"Reporte ejecutado el día {execution_date}"
if status == ProcessStatusEnum.SUCCESS.value:
status = "EXITOSO"
elif status == ProcessStatusEnum.FAIL.value:
status = "FALLIDO"
elif status == ProcessStatusEnum.RESET.value:
status = "RESETEADO POR EL USUARIO"
status = f"Estado de último proceso ejecutado: {status}"
header_format = workbook.add_format()
header_format.set_font_size(10)
header_format.set_font_color("#080606")
worksheet.merge_range('A1:N1', title, title_format)
worksheet.merge_range('A2:N2', header, header_format)
worksheet.merge_range('A3:N3', status, header_format)
row_format = workbook.add_format()
row_format.set_font_size(8)
row_format.set_font_color("#000000")
base_index = 5
for index, key in enumerate(data.keys()):
index = base_index + index
worksheet.merge_range('A'+str(index)+':B'+str(index), key, row_format)
if data[key]["TYPE"] == "EXTRACTION":
worksheet.merge_range('C'+str(index)+':G'+str(index), f"TABLA DE EXTRACCIÓN: {data[key]['DESCRIPTION']}", row_format)
elif data[key]["TYPE"] == "TRANSFORMATION":
script = data[key]["DESCRIPTION"].split("|")[1]
worksheet.merge_range('C'+str(index)+':G'+str(index), f"SCRIPT DE TRANSFORMACIÓN: {script}", row_format)
elif data[key]["TYPE"] == "GENERATION":
worksheet.merge_range('C'+str(index)+':G'+str(index), f"ARCHIVO GENERADO DESDE LA TABLA: {data[key]['DESCRIPTION']}", row_format)
worksheet.merge_range('H'+str(index)+':I'+str(index), f"ESTADO: {data[key]['STATUS']}", row_format)
worksheet.merge_range('J'+str(index)+':N'+str(index), data[key]['MESSAGE'], row_format)
task.xcom_push(key="REPORT_PATH", value=excel_tmp_path)
except Exception as e:
logger.error(f"Error creando reporte. {e}")
def get_data_report(**kwargs) -> None:
try:
report_data = {}
task = kwargs['ti']
control_config = task.xcom_pull(task_ids="CONTROL-EXTRACTOR", key="CONTROL-CONFIG")
control_key, control = control_config[0], control_config[1]
if not control:
logger.info("Archivo de control no encontrado. No se actualizará ningún archivo de control")
else:
last_process = control[-1]
if "reset_by_user" in last_process.keys():
report_data["PROCESS_EXECUTION"] = ProcessStatusEnum.RESET.value
else:
total_tasks = [last_process["tasks"]]
current_status = last_process["status"]
control.reverse()
for process in control:
if process["status"] == ProcessStatusEnum.SUCCESS.value:
break
total_tasks.append(process["tasks"])
final_key_tasks, final_key_desc, final_key_message = {}, {}, {}
for tasks in total_tasks:
for key in tasks.keys():
this_status = tasks[key]["status"]
this_desc = tasks[key]["description"]
this_message = tasks[key]["message"]
if key in final_key_tasks.keys():
task_status = final_key_tasks[key]
if this_status == ProcessStatusEnum.SUCCESS.value and \
task_status == ProcessStatusEnum.FAIL.value:
final_key_tasks.update({key: this_status})
final_key_desc.update({key: this_desc})
final_key_message.update({key: ''})
else:
final_key_tasks.update({key: this_status})
final_key_desc.update({key: this_desc})
final_key_message.update({key: this_message})
for item in final_key_tasks.keys():
if item.lower().startswith("extract"):
type_task = "EXTRACTION"
elif item.lower().startswith("transform"):
type_task = "TRANSFORMATION"
else:
type_task = "GENERATION"
report_data.update({item: {"STATUS": final_key_tasks[item], "TYPE": type_task,
"DESCRIPTION": final_key_desc[item], 'MESSAGE': final_key_message[item]}})
report_data.update({"PROCESS_STATUS": current_status, "PROCESS_EXECUTION": last_process["date"]})
task.xcom_push(key="REPORT-DATA", value=report_data)
logger.info(f"Diccionario de datos para el reporte: {report_data}")
except Exception as e:
logger.error(f"Error general creando reporte. {e}")
def set_dag():
""" DAG that reset the last process Airflow have"""
import yaml
from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que informa del último proceso ejecutado",
schedule_interval=conf["inform_dag_schedule"], tags=["DAG BCOM - INFORM PROCESS"], catchup=True) as dag:
control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"]
control_extractor = PythonOperator(
task_id="CONTROL-EXTRACTOR",
python_callable=extract_last_control,
op_kwargs={'conn_id': control_s3["connection_id"], 'bucket': control_s3["bucket"],
'prefix': control_s3["prefix"], 'provider': conf["cloud_provider"], 'timezone': timezone},
trigger_rule="all_success"
)
create = PythonOperator(
task_id="GET-DATA-REPORT",
python_callable=get_data_report,
op_kwargs={},
trigger_rule="all_success"
)
tmp_dir = conf["outputs"]["tmp_path"]
report = PythonOperator(
task_id="CREATE-REPORT",
python_callable=create_report,
op_kwargs={'tmp_path': tmp_dir},
trigger_rule="all_success"
)
report_params = conf["report"]
upload = PythonOperator(
task_id="UPLOAD-REPORT",
python_callable=upload_report,
op_kwargs={'report_params': report_params, 'provider': conf["cloud_provider"], 'timezone': timezone},
trigger_rule="all_success"
)
control_extractor >> create >> report >> upload
return dag
globals()["0"] = set_dag()
from datetime import datetime
from typing import Any, Dict
from airflow import DAG
import json
from airflow.operators.python import PythonOperator
from components.Databases.Database import Database
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.S3Route import upload_file
from components.Control import extract_last_control
from components.Utils import delete_temp_dirs
import logging
logger = logging.getLogger()
DAG_NAME = "RESET_PROCESS"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
DEFAULT_ARGS = {
'owner': 'BCOM',
"start_date": datetime(2023, 7, 28, 22, 9),
'depends_on_past': False,
'email': 'caguirre@bytesw.com',
'retries': 0,
'email_on_retry': False,
'email_on_failure': False
}
def update_control(control_params: Dict[str, Any], provider: str, **kwargs) -> None:
try:
task = kwargs['ti']
control_config = task.xcom_pull(task_ids="CONTROL-EXTRACTOR", key="CONTROL-CONFIG")
control_key, control = control_config[0], control_config[1]
if not control:
logger.info("Archivo de control no encontrado. No se actualizará ningún archivo de control")
else:
last_process = control[-1]
last_process.update({'reset_by_user': True})
control.pop(-1)
control.append(last_process)
control = json.dumps(control).encode('utf-8')
load = upload_file(control, control_params["connection_id"], control_params["bucket"],
control_key, provider)
if load:
logger.info(f"Subido correctamente el archivo de control en bucket {control_params['bucket']} y"
f"key: {control_key}")
# Borrar las variables que hayan quedado
delete_all_xcom_tasks()
delete_task_instances()
except Exception as e:
logger.error(f"Error actualizando archivo de control. {e}")
def reset_process(intern_db, output_tmp_dir: str) -> None:
try:
# Borrrando tablas
tablenames = intern_db.get_all_tablenames()
if len(tablenames) == 0:
logger.info("No se encontraron tablas para su limpieza")
else:
for tablename in tablenames:
tablename = tablename[0]
delete = delete_table(tablename, intern_db.engine)
if delete:
logger.info(f"Borrado correctamente la tabla {tablename}")
# Borrando archivos temporales que hayan quedado sueltos
delete = delete_temp_dirs(output_tmp_dir)
if delete:
logger.info("Se borraron todos los archivos temporales")
except Exception as e:
logger.error(f"Error procesando archivo de control. {e}")
def set_dag():
""" DAG that reset the last process Airflow have"""
import yaml
from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que resetea el último proceso ejecutado",
schedule_interval=conf["reset_dag_schedule"], tags=["DAG BCOM - RESET PROCESS"], catchup=True) as dag:
control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"]
control_extractor = PythonOperator(
task_id="CONTROL-EXTRACTOR",
python_callable=extract_last_control,
op_kwargs={'conn_id': control_s3["connection_id"], 'bucket': control_s3["bucket"],
'prefix': control_s3["prefix"], 'provider': conf["cloud_provider"], 'timezone': timezone},
trigger_rule="all_success"
)
# Intern Database configuration.
intern_params = conf["database"]["transformation"]
intern_db = Database(intern_params["type"], intern_params["host"], int(intern_params["port"]),
intern_params["username"], intern_params["password"], intern_params["database"],
intern_params["service"], intern_params["schema"])
intern_db.create_engine()
tmp_dir = conf["outputs"]["tmp_path"]
control_process = PythonOperator(
task_id="RESET-PROCESS",
python_callable=reset_process,
op_kwargs={'intern_db': intern_db, 'output_tmp_dir': tmp_dir},
trigger_rule="all_success"
)
update = PythonOperator(
task_id="UPDATE-CONTROL",
python_callable=update_control,
op_kwargs={'control_params': control_s3, 'provider': conf["cloud_provider"]},
trigger_rule="all_success"
)
control_extractor >> control_process >> update
return dag
globals()["0"] = set_dag()
......@@ -7,9 +7,9 @@ from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from components.Utils import update_sql_commands
from components.Xcom import save_commands_to_xcom, delete_task_instances
from components.S3Route import get_files_from_prefix, get_file_from_key
from components.Sensor import create_s3_sensor
from components.Xcom import save_commands_to_xcom
from components.S3Route import get_files_from_prefix, get_file_from_prefix
from components.Sensor import create_sensor
from components.Extractor import get_extract_task_group
from components.Transformation import get_transform_task_group
from components.Generation import get_generate_task_group
......@@ -22,11 +22,11 @@ import logging
logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACIONES3"
DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition.json"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition2.json"
DEFAULT_ARGS = {
'owner': 'BCOM',
......@@ -39,10 +39,10 @@ DEFAULT_ARGS = {
}
def cleaning(intern_conn, control_s3: Dict[str, Any]) -> TaskGroup:
def cleaning(intern_conn, control_s3: Dict[str, Any], provider: str, timezone: str) -> TaskGroup:
groups = None
try:
groups = get_cleaning_task_group(intern_conn, control_s3)
groups = get_cleaning_task_group(intern_conn, control_s3, provider, timezone)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
......@@ -50,20 +50,20 @@ def cleaning(intern_conn, control_s3: Dict[str, Any]) -> TaskGroup:
def generate_and_deploy_results(intern_conn, parameters: Dict[str, Any], timezone: str,
control_s3: Dict[str, Any]) -> TaskGroup:
control_s3: Dict[str, Any], provider: str) -> TaskGroup:
groups = None
try:
groups = get_generate_task_group(intern_conn, parameters, control_s3, timezone)
groups = get_generate_task_group(intern_conn, parameters, control_s3, timezone, provider)
except Exception as e:
logger.error(f"Error general de creación y despliegue de resultados. {e}")
finally:
return groups
def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any]) -> TaskGroup:
def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any], provider: str) -> TaskGroup:
groups = None
try:
groups = get_transform_task_group(intern_conn, timezone, control_s3)
groups = get_transform_task_group(intern_conn, timezone, control_s3, provider)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
......@@ -71,10 +71,10 @@ def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any]) -> Ta
def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, Any],
chunksize: int = 100000) -> TaskGroup:
provider: str, chunksize: int = 100000) -> TaskGroup:
groups = None
try:
groups = get_extract_task_group(source_conn, intern_conn, chunksize, timezone, control_s3)
groups = get_extract_task_group(source_conn, intern_conn, chunksize, timezone, control_s3, provider)
except Exception as e:
logger.error(f"Error general de extracción de datos. {e}")
finally:
......@@ -87,25 +87,27 @@ def save_procedure_json(json_path: str, task) -> None:
data = json.load(f)
logger.info(f"JSON-PROCEDURE {data}")
if data:
task.xcom_push(key="PROCEDURE-JSON", value=data)
task.xcom_push(key="EXTRACTION-DEFINITION-JSON", value=data)
except Exception as e:
logger.error(f"Error leyendo y guardando archivo descriptor de procedure. {e}")
def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task):
def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str, timezone: str):
try:
if not prefix.endswith("/"):
prefix += "/"
key = prefix + filename
logger.info(f"EXTRAYENDO ARCHIVO DE CONTROL DESDE {key}")
control = get_file_from_key(conn_id, bucket, key)
logger.info(f"BUSCANDO Y EXTRAYENDO ARCHIVO DE CONTROL DESDE {prefix}")
control, _ = get_file_from_prefix(conn_id, bucket, prefix, provider, timezone, task)
if control:
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data)
data = json.load(data)
if not data:
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}]
else:
data = json.load(data)
else:
logger.info(f"Json Procedure descargado: {control}")
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": []}]
logger.info(f"Json de control creado: {control}")
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}]
task.xcom_push(key="CONTROL-CONFIG", value=data)
except Exception as e:
logger.error(f"Error general de descarga de archivo de control. {e}")
......@@ -113,14 +115,14 @@ def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task)
def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str,
order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any],
**kwargs):
provider: str, timezone: str, **kwargs):
try:
extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"],
control_params["filename"], kwargs['ti'])
kwargs['ti'], provider, timezone)
save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti'])
start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
scripts = get_files_from_prefix(conn_id, bucket, prefix)
scripts = get_files_from_prefix(conn_id, bucket, prefix, provider)
scripts = update_sql_commands(scripts, label_tablename)
save_commands_to_xcom(scripts, kwargs['ti'], source_mask, transform_mask, procedure_mask, order_delimiter)
logger.debug(f"Script cargados en Xcom: {scripts}")
......@@ -149,9 +151,10 @@ def set_dag():
wildcard_scripts = scripts_s3["prefix"] + "?*"
else:
wildcard_scripts = scripts_s3["prefix"] + "/?*"
sensor_scripts = create_s3_sensor("SCRIPTS-SENSOR", scripts_s3["connection_id"], scripts_s3["bucket"],
wildcard_scripts)
sensor_scripts = create_sensor("SCRIPTS-SENSOR", scripts_s3["connection_id"], scripts_s3["bucket"],
wildcard_scripts, conf["cloud_provider"])
control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"]
# Scripts extraction
extract_mask = conf["source_mask"]
transform_mask = conf["transformation_mask"]
......@@ -163,7 +166,8 @@ def set_dag():
op_kwargs={'conn_id': scripts_s3["connection_id"], 'bucket': scripts_s3["bucket"],
'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask,
'procedure_mask': procedure_mask, 'order_delimiter': order_delimiter,
'label_tablename': conf["label_multiple_select"], 'control_params': control_s3},
'label_tablename': conf["label_multiple_select"], 'control_params': control_s3,
'provider': conf["cloud_provider"], 'timezone': timezone},
trigger_rule="all_success"
)
......@@ -183,18 +187,17 @@ def set_dag():
# Creación de grupo de tasks para las extracciones
chunksize = conf["chunksize"]
timezone = conf["timezone"]
extractions = extraction(source_db, intern_db, timezone, control_s3, chunksize)
extractions = extraction(source_db, intern_db, timezone, control_s3, conf["cloud_provider"], chunksize)
# Creación de grupo de tasks para las transformaciones
transformations = transformation(intern_db, timezone, control_s3)
transformations = transformation(intern_db, timezone, control_s3, conf["cloud_provider"])
# Creación de grupo de tasks para la generación y despliegue de archivos resultados
outputs_conf = conf["outputs"]
result = generate_and_deploy_results(intern_db, outputs_conf, timezone, control_s3)
result = generate_and_deploy_results(intern_db, outputs_conf, timezone, control_s3, conf["cloud_provider"])
# Creación de tasks de limpiadores
cleaners = cleaning(intern_db, control_s3)
cleaners = cleaning(intern_db, control_s3, conf["cloud_provider"], timezone)
sensor_scripts >> script_extractor >> extractions >> transformations >> result >> cleaners
return dag
......
......@@ -6,7 +6,7 @@ import pandas as pd
import numpy as np
from components.S3Route import get_df_from_s3, get_base_date, save_df_to_s3, move_object_s3
from components.Sensor import create_s3_sensor
from components.Sensor import create_sensor
from components.Utils import get_modified_prefix, remove_invalid_rows, remove_fields, update_dict_with_catalogs
from airflow import DAG
......@@ -256,26 +256,26 @@ def set_dag_1():
catalogs_dict = update_dict_with_catalogs(catalogs_dict, conf, "no_promocion", s3_prefix)
# Define the sensor to verify if data exists or have been updated
s3_sensor_tacom = create_s3_sensor("S3_sensor_tacom_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
s3_tacom)
s3_sensor_tacom = create_sensor("S3_sensor_tacom_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
s3_tacom)
s3_sensor_promo = create_s3_sensor("S3_sensor_promo_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
s3_promo)
s3_sensor_promo = create_sensor("S3_sensor_promo_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
s3_promo)
s3_sensor_promo_catalog = create_s3_sensor("S3_sensor_promo_catalog_task", s3_conf["s3_conn_id"],
s3_conf["bucket"], catalogs_dict["s3_catalogo_promociones"])
s3_sensor_promo_catalog = create_sensor("S3_sensor_promo_catalog_task", s3_conf["s3_conn_id"],
s3_conf["bucket"], catalogs_dict["s3_catalogo_promociones"])
s3_sensor_3a2p = create_s3_sensor("S3_sensor_3a2p_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacion3pa2p"])
s3_sensor_3a2p = create_sensor("S3_sensor_3a2p_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacion3pa2p"])
s3_sensor_poid = create_s3_sensor("S3_sensor_poid_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacionpoidpaquete"])
s3_sensor_poid = create_sensor("S3_sensor_poid_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacionpoidpaquete"])
s3_sensor_paq = create_s3_sensor("S3_sensor_paq_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacion_paquetes"])
s3_sensor_paq = create_sensor("S3_sensor_paq_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacion_paquetes"])
s3_sensor_notpromo = create_s3_sensor("S3_sensor_notpromo_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_no_promocion"])
s3_sensor_notpromo = create_sensor("S3_sensor_notpromo_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_no_promocion"])
outputs = conf["s3_parameters"]["outputs"]
output_prefix = outputs["prefix"]
......@@ -310,5 +310,5 @@ def set_dag_1():
return dag
globals()["0"] = set_dag_1()
# globals()["0"] = set_dag_1()
......@@ -8,5 +8,7 @@ class DataTypeEnum(enum.Enum):
NUMBER = int
TEXT = str
DECIMAL = float
BOOLEAN = bool
DATETIME = Timestamp
DATE = pd.Timestamp
LONGTEXT = str
import enum
from sqlalchemy import Integer, String, Date, DateTime, DECIMAL
from sqlalchemy import Integer, String, Date, DateTime, DECIMAL, BOOLEAN, Text
class DataTypeOrmEnum(enum.Enum):
......@@ -8,4 +8,5 @@ class DataTypeOrmEnum(enum.Enum):
DECIMAL = DECIMAL
DATE = Date
DATETIME = DateTime
BOOLEAN = BOOLEAN
LONGTEXT = Text
......@@ -4,3 +4,4 @@ from enum import Enum
class ProcessStatusEnum(Enum):
SUCCESS = "success"
FAIL = "failed"
RESET = "reset"
from enum import Enum
class ProviderTypeEnum(Enum):
GOOGLE = "google"
AMAZON = "aws"
MINIO = "local"
[{
"procedure_identifier": "PROCEDURE_1",
[
{
"identifier": "TACOMVENTAS",
"fields": [
{
"name": "CD_EMPRESA",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "CD_FOLIO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_CUENTA",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "NU_VENDEDOR",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "NU_ADDON",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NB_PAQUETE",
"datatype": "TEXT",
"maxLength": 200
},
{
"name": "CD_CLIENTE",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "NB_CLIENTE",
"datatype": "TEXT",
"maxLength": 200
},
{
"name": "FH_CONTRATACION",
"datatype": "DATE"
},
{
"name": "FH_ACTIVACION",
"datatype": "DATE"
},
{
"name": "FH_OPERACION",
"datatype": "DATE"
},
{
"name": "TP_SERVICIO",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "ST_CLIENTE",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "TP_PAGO",
"datatype": "TEXT",
"maxLength": 10
},
{
"name": "NB_USUACARGA",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "FH_CARGA",
"datatype": "DATE"
},
{
"name": "NU_ANIO",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NU_MES",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NU_SEMANA",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NU_COMISION",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "TP_PAGOANT",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "REGLA_APLICADA",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "AUX",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "PROMOCION",
"datatype": "TEXT",
"maxLength": 80
},
{
"name": "EMPLEADOEMBAJADOR__C",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CANAL_DE_VENTA__C",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "CANALREFERIDO__C",
"datatype": "TEXT",
"maxLength": 50
}
],
"indexes": [
"CD_PAQUETE", "NU_ADDON", "CD_CLIENTE"
]
},
{
"identifier": "PROMOCIONES_RESIDENCIAL",
"fields": [
{
"identifier": "CD_FOLIO",
"name": "EMPRESA",
"datatype": "TEXT",
"decimal_precision": 0,
"maxLength": 100
"maxLength": 250
},
{
"identifier": "CD_CUENTA",
"name": "CUENTA",
"datatype": "TEXT",
"decimal_precision": null,
"maxLength": 100
"maxLength": 250
},
{
"identifier": "CD_PAQUETE",
"name": "PLAN",
"datatype": "TEXT",
"decimal_precision": null,
"maxLength": 100
"maxLength": 250
},
{
"name": "NOMBRE_PRODUCTO",
"datatype": "TEXT",
"maxLength": 250
},
{
"name": "DESCR",
"datatype": "TEXT",
"maxLength": 250
},
{
"name": "TIPO_ADICION",
"datatype": "TEXT",
"maxLength": 250
},
{
"name": "RENTACONIMPUESTOS",
"datatype": "NUMBER"
},
{
"name": "RENTASINIMPUESTOS",
"datatype": "NUMBER"
},
{
"name": "QUANTITY",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "CREACION_PRODUCTO",
"datatype": "DATE"
},
{
"name": "INICIO_COBRO",
"datatype": "DATE"
},
{
"name": "FIN_COBRO",
"datatype": "DATE"
},
{
"name": "FIN_COMPRA",
"datatype": "DATE"
},
{
"name": "SERV_STATUS",
"datatype": "TEXT",
"maxLength": 20
},
{
"name": "POID_TYPE",
"datatype": "TEXT",
"maxLength": 200
},
{
"identifier": "NB_PAQUETE",
"name": "POID_PRODUCT",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "STATUS_PRODUCTO",
"datatype": "TEXT",
"decimal_precision": null,
"maxLength": 200
},
{
"name": "SERVICIO",
"datatype": "TEXT",
"maxLength": 200
},
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 100
}
],
"indexes": ["CD_PAQUETE"]
},
{
"identifier": "CATALOGO_PROMOCIONES",
"fields": [
{
"name": "NOMBRE_PRODUCTO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"procedure_identifier": "procedure_prueba2",
"identifier": "RELACION_PROMOCION_3PA2P",
"fields": [
{
"identifier": "col1",
"datatype": "DATE",
"pattern": "%Y-%m-%d",
"decimal_precision": null,
"maxLength": null
"name": "TRESP",
"datatype": "TEXT",
"maxLength": 50
},
{
"identifier": "col2",
"datatype": "TIME",
"pattern": "%H:%M:%S",
"decimal_precision": null,
"maxLength": null
"name": "DOSP",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "RELACION_POIDPAQUETE",
"fields": [
{
"name": "POID_PRODUCT",
"datatype": "TEXT",
"maxLength": 50
},
{
"identifier": "col3",
"datatype": "DATETIME",
"pattern": "%Y-%m-%d %H:%M:%S",
"decimal_precision": null,
"maxLength": null
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "RELACION_PAQINI_PAQFIN",
"fields": [
{
"name": "COD_PAQ_INI",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "COD_PAQ_FIN",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "PAQUETES_NOPROMOCION",
"fields": [
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "PROCEDURE_1",
"fields": [
{
"name": "CD_FOLIO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_CUENTA",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "NB_PAQUETE",
"datatype": "TEXT",
"maxLength": 200
}
]
}
......
[
{
"identifier": "TABLA1",
"fields": [
{
"name": "CD_CUENTA",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "DPP_FLAG",
"datatype": "NUMBER"
},
{
"name": "FECHA_FACTURA",
"datatype": "DATE"
},
{
"name": "FECHA_INICIAL_DPP",
"datatype": "DATE"
},
{
"name": "FECHA_LIMITE_DPP",
"datatype": "DATE"
},
{
"name": "FH_CARGA",
"datatype": "DATE"
},
{
"name": "ID_FACTURA",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "MONTO_AJUSTADO",
"datatype": "DECIMAL"
},
{
"name": "MONTO_FACTURA",
"datatype": "DECIMAL"
},
{
"name": "MONTO_PAGO",
"datatype": "DECIMAL"
},
{
"name": "SALDO_FACTURA",
"datatype": "DECIMAL"
}
]
}
]
\ No newline at end of file
FROM apache/airflow:2.5.3
FROM apache/airflow:slim-2.5.3
COPY requirements.txt /
RUN pip install --no-cache-dir "apache-airflow==2.5.3" "apache-airflow[kubernetes]==2.5.3" -r /requirements.txt
\ No newline at end of file
......@@ -77,7 +77,7 @@ data:
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}'
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.4"
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.5"
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF: airflow-envvars-configmap
AIRFLOW__KUBERNETES_EXECUTOR__POD_TEMPLATE_FILE: /opt/airflow/templates/pod_template.yaml
......
apache-airflow[kubernetes]==2.5.3
openpyxl==3.1.2
XlsxWriter==3.1.2
pymysql==1.1.0
oracledb==1.3.2
apache-airflow-providers-google
apache-airflow-providers-amazon
apache-airflow-providers-postgres
apache-airflow-providers-oracle
\ No newline at end of file
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-sync-dags
namespace: bcom-airflow
spec:
selector:
matchLabels:
app: airflow-sync-dags
template:
metadata:
labels:
app: airflow-sync-dags
spec:
containers:
- args:
- while true; gcloud rsync -d -r ${GCS_DAGS_DIR:-gs://prueba-rsync/carpeta} /dags;
do sleep ${SYNCHRONYZE_DAG_DIR:-30}; done;
command:
- /bin/bash
- -c
- --
name: sync-dags-gcloud
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:alpine
envFrom:
- configMapRef:
name: airflow-envvars-configmap
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
key: AWS_ACCESS_KEY
name: credentials
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
key: AWS_SECRET_KEY
name: credentials
volumeMounts:
- name: dags-host-volume
mountPath: /dags
volumes:
- name: dags-host-volume
persistentVolumeClaim:
claimName: airflow-dags-pvc
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment