Commit 88948dcc authored by Cristian Aguirre's avatar Cristian Aguirre

Update 10-07-23. Update DAG process. Add procedure functionality (In progress)

parent b1377db9
from typing import Dict, Any
import json
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from components.Utils import select_multiple
from components.Xcom import delete_all_xcom_tasks
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.S3Route import load_obj_to_s3
import logging
logger = logging.getLogger()
def validate_clean(control_params: Dict[str, Any], **kwargs) -> None:
ti = kwargs["ti"]
conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG")
conn = control_params["connection_id"]
bucket = control_params["bucket"]
prefix = control_params["prefix"]
if not prefix.endswith("/"):
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
def clean(command: str, intern_conn):
engine = intern_conn.engine
tablename = select_multiple(command)["tablename"]
......@@ -37,7 +55,7 @@ def get_generate_from_xcom(**kwargs):
Variable.set(key='CLEANS', value=final_selects, serialize_json=True)
def get_cleaning_task_group(db_intern_conn) -> TaskGroup or None:
def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any]) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="LimpiezaDelProceso", prefix_group_id=False) as group:
......@@ -54,7 +72,13 @@ def get_cleaning_task_group(db_intern_conn) -> TaskGroup or None:
op_kwargs={'intern_conn': db_intern_conn}
).expand(op_args=[[item] for item in cleaners])
init_task >> tasks
validate_task = PythonOperator(
task_id="VALIDATE_CLEANER",
python_callable=validate_clean,
op_kwargs={'control_params': control_s3},
trigger_rule='all_done'
)
init_task >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de limpiadores. {e}")
finally:
......
from components.Timezone import datetime_by_tzone
from enums.ProcessStatusEnum import ProcessStatusEnum
import logging
from typing import Dict, Any, List, Tuple
logger = logging.getLogger()
def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[str, Any]:
response = {'status': ProcessStatusEnum.SUCCESS.value, 'tasks': []}
try:
conf = conf[-1]
logger.info(f"Último proceso ejecutado: {conf}")
status = conf["status"]
if status == ProcessStatusEnum.FAIL.value:
response["status"] = ProcessStatusEnum.FAIL.value
success_tasks = conf["tasks"]
final_tasks = []
for key in success_tasks.keys():
if type_task.lower() not in key.lower():
continue
task = success_tasks[key]
task_status = task["status"]
if task_status == ProcessStatusEnum.SUCCESS.value:
command = task["description"]
result = (key, command)
final_tasks.append(result)
response["tasks"] = final_tasks
except Exception as e:
logger.error(f"Error obteniendo task fallidos desde control. {e}")
finally:
return response
def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str, Any],
timezone: str, delete_last: bool = False) -> List[Dict[str, Any]]:
try:
if delete_last:
print("CONF-TRANS:", conf)
last_tasks = conf[-1]["tasks"]
tasks.update(last_tasks)
print("LAST:", tasks)
conf.pop(-1)
current_datetime = str(datetime_by_tzone(timezone, '%Y-%m-%d %H:%M:%S'))
new_process = {"date": current_datetime, "status": status, "tasks": tasks}
conf.append(new_process)
except Exception as e:
logger.error(f"Error actualizando archivo de control. {e}")
finally:
return conf
......@@ -39,7 +39,7 @@ class Mysql:
'__table_args__': {'extend_existing': True}
})
for field in fields:
logger.info(f"Attribute: {field}")
logger.debug(f"Attribute: {field}")
name = field[0]
if field[2] != -1:
size = int(field[2] / 4)
......
......@@ -38,7 +38,7 @@ class Oracle:
'__table_args__': {'extend_existing': True}
})
for field in fields:
logger.info(f"Attribute: {field}")
logger.debug(f"Attribute: {field}")
name = field[0]
# Default precision for Integer Oracle : 38
if not isinstance(field[2], type(None)):
......
......@@ -40,7 +40,7 @@ class Postgres:
'__table_args__': {'extend_existing': True}
})
for field in fields:
logger.info(f"Attribute: {field}")
logger.debug(f"Attribute: {field}")
name = field[0]
if field[2] != -1:
try:
......
This diff is collapsed.
from typing import Any, Dict
import os
import json
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from components.S3Route import save_df_to_s3
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.S3Route import save_df_to_s3, load_obj_to_s3
from components.Utils import select_multiple, create_temp_file, delete_temp_dir
from components.Control import get_tasks_from_control, update_new_process
from components.Xcom import delete_all_xcom_tasks
from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps
import logging
......@@ -13,6 +17,63 @@ import logging
logger = logging.getLogger()
def validate_generate(control_params: Dict[str, Any], timezone: str, **kwargs) -> None:
ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="GENERATORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_TRANSFORMATION", key="CONTROL-CONFIG")
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
for success_task in success_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=success_task)[0]
final_dict.update({success_task: task})
if not isinstance(failed_tasks, type(None)) and len(failed_tasks) > 0:
status = ProcessStatusEnum.FAIL.value
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, True)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
prefix = control_params["prefix"]
if not prefix.endswith("/"):
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
raise AssertionError(f"Ocurrieron errores en la etapa de generación")
elif status == ProcessStatusEnum.SUCCESS.value:
ti.xcom_push(key="CONTROL-CONFIG", value=conf)
def on_failure_generator(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('GENERATES', default_var=[], deserialize_json=True)
table = selects[ti.map_index]
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
task_result = {"description": table, "status": status, "message": exception}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="FAILED_TASKS", value=task_name)
def on_success_generator(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('GENERATES', default_var=[], deserialize_json=True)
table = selects[ti.map_index]
status = ProcessStatusEnum.SUCCESS.value
task_result = {"description": table, "status": status, "message": ""}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timezone: str, chunksize=10000):
engine = intern_conn.engine
tablename = select_multiple(command)["tablename"]
......@@ -22,7 +83,9 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
filename_mask = params["filename_mask"]
file_type = params["file_type"]
pattern = params["datetime_pattern"]
tmp_file = create_temp_file(filename_mask, file_type, tablename, timezone, pattern)
delimiter = params["delimiter"]
tmp_path = params["tmp_path"]
tmp_file = create_temp_file(tmp_path, filename_mask, file_type, tablename, timezone, pattern)
logger.info(tmp_file)
steps = get_steps(tablename, chunksize, engine, True)
iterator = get_iterator(tablename, chunksize, engine)
......@@ -34,7 +97,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
dataframe = next(iterator)
dataframe = dataframe.drop("INTERN_ID_BCOM", axis=1)
logger.debug(dataframe)
dataframe.to_csv(tmp_file, index=False, mode='a', header=header)
dataframe.to_csv(tmp_file, sep=delimiter, index=False, mode='a', header=header)
except StopIteration:
break
......@@ -53,21 +116,28 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
def get_generate_from_xcom(**kwargs):
final_outputs = []
task = kwargs['ti']
final_outputs = []
conf = task.xcom_pull(task_ids="VALIDATE_TRANSFORMATION", key="CONTROL-CONFIG")
tasks = get_tasks_from_control(conf, "generator")
success_tasks = tasks["tasks"]
success_tasks = [item[1] for item in success_tasks]
logger.info(f"GENERADORES QUE FUERON EXITOSOS (TABLAS): {success_tasks}")
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith("SELECT"):
continue
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_outputs}")
logger.info(f"Trayendo tablas {xcom_outputs}")
for select in xcom_outputs:
final_outputs.append(select)
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or select not in success_tasks:
final_outputs.append(select)
Variable.set(key='GENERATES', value=final_outputs, serialize_json=True)
def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], timezone) -> TaskGroup or None:
def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_s3: Dict[str, Any],
timezone: str) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="GeneracionyDespliegueDeResultados", prefix_group_id=False) as group:
......@@ -81,10 +151,18 @@ def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], timezone
tasks = PythonOperator.partial(
task_id="GENERATORS",
python_callable=generate_and_deploy,
on_failure_callback=on_failure_generator,
on_success_callback=on_success_generator,
op_kwargs={'intern_conn': db_intern_conn, 'params': parameters, 'timezone': timezone}
).expand(op_args=[[item] for item in outputs])
init_task >> tasks
validate_task = PythonOperator(
task_id="VALIDATE_GENERATOR",
python_callable=validate_generate,
op_kwargs={'control_params': control_s3, 'timezone': timezone},
trigger_rule='all_done'
)
init_task >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de generadores. {e}")
finally:
......
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer
from sqlalchemy import Column, BigInteger
Base = declarative_base()
......@@ -8,4 +8,4 @@ class InsumoModel(Base):
__abstract__ = True
INTERN_ID_BCOM = Column(Integer, primary_key=True, autoincrement=True)
INTERN_ID_BCOM = Column(BigInteger, primary_key=True, autoincrement=True)
import fnmatch
import datetime
from typing import Any, Dict, List, Tuple
import json
import pytz
from io import BytesIO, StringIO
import pandas as pd
......@@ -148,3 +148,27 @@ def get_files_from_prefix(conn: str, bucket: str, prefix: str) -> List[Tuple[str
logger.error(f"Error extrayendo archivos en memoria desde bucket {bucket} y prefix {prefix}. {e}")
finally:
return result
def get_file_from_key(conn: str, bucket: str, key: str) -> Any:
result = BytesIO()
try:
s3_hook = S3Hook(conn)
data = s3_hook.get_key(key, bucket)
data.download_fileobj(result)
except Exception as e:
logger.error(f"Error extrayendo archivo {key}. {e}")
finally:
return result
def load_obj_to_s3(obj, conn: str, bucket: str, key: str, replace=True) -> bool:
load = False
try:
s3_hook = S3Hook(conn)
s3_hook.load_bytes(obj, key, bucket, replace)
load = True
except Exception as e:
logger.error(f"Error subiendo archivo de control a bucket {bucket} y key {key}. {e}")
finally:
return load
import json
from typing import Dict, Any
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_obj_to_s3
from components.Xcom import delete_all_xcom_tasks
from enums.ProcessStatusEnum import ProcessStatusEnum
import logging
logger = logging.getLogger()
def validate_transform(control_params: Dict[str, Any], timezone: str, **kwargs) -> None:
ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
for success_task in success_tasks:
task = ti.xcom_pull(task_ids="TRANSFORMATIONS", key=success_task)[0]
final_dict.update({success_task: task})
if not isinstance(failed_tasks, type(None)) and len(failed_tasks) > 0:
status = ProcessStatusEnum.FAIL.value
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="TRANSFORMATIONS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, True)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
prefix = control_params["prefix"]
if not prefix.endswith("/"):
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
raise AssertionError(f"Ocurrieron errores en la etapa de transformación")
elif status == ProcessStatusEnum.SUCCESS.value:
ti.xcom_push(key="CONTROL-CONFIG", value=conf)
def on_failure_transform(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
transform = Variable.get('TRANSFORMS', default_var=[], deserialize_json=True)
script = transform[ti.map_index][0]
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
task_result = {"description": script, "status": status, "message": exception}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="FAILED_TASKS", value=task_name)
def on_success_transform(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
transform = Variable.get('TRANSFORMS', default_var=[], deserialize_json=True)
script = transform[ti.map_index][0]
status = ProcessStatusEnum.SUCCESS.value
task_result = {"description": script, "status": status, "message": ""}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def transformations(xcom_commands: str, intern_conn):
engine = intern_conn.engine
script_name = xcom_commands[0]
......@@ -14,27 +79,35 @@ def transformations(xcom_commands: str, intern_conn):
logger.info(f"Ejecutando transformaciones del script {script_name}")
with engine.connect() as connection:
for command in commands:
logger.debug(f"Ejecutando comando de transformación: {command}")
logger.info(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
def get_trans_from_xcom(**kwargs):
task = kwargs['ti']
transforms_per_file = []
final_transforms = []
task = kwargs['ti']
conf = task.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
tasks = get_tasks_from_control(conf, "transformation")
success_tasks = tasks["tasks"]
success_tasks = [item[1] for item in success_tasks]
logger.info(f"SCRIPTS QUE FUERON EXITOSOS: {success_tasks}")
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith("TRANSFORM"):
continue
xcom_transforms = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_transforms}")
for transform in xcom_transforms:
final_transforms.append(transform)
transforms_per_file.append((key, final_transforms))
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or key not in success_tasks:
for transform in xcom_transforms:
final_transforms.append(transform)
transforms_per_file.append((key, final_transforms))
logger.info(f"Scripts para la transformación: {transforms_per_file}")
Variable.set(key='TRANSFORMS', value=transforms_per_file, serialize_json=True)
def get_transform_task_group(db_intern_conn) -> TaskGroup or None:
def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str, Any]) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="TransformacionDeDatos", prefix_group_id=False) as group:
......@@ -48,10 +121,18 @@ def get_transform_task_group(db_intern_conn) -> TaskGroup or None:
tasks = PythonOperator.partial(
task_id="TRANSFORMATIONS",
python_callable=transformations,
op_kwargs={'intern_conn': db_intern_conn}
op_kwargs={'intern_conn': db_intern_conn},
on_failure_callback=on_failure_transform,
on_success_callback=on_success_transform
).expand(op_args=[[item] for item in transforms])
init_task >> tasks
validate_task = PythonOperator(
task_id="VALIDATE_TRANSFORMATION",
python_callable=validate_transform,
op_kwargs={'control_params': control_s3, 'timezone': timezone},
trigger_rule='none_skipped'
)
init_task >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de transformación. {e}")
finally:
......
......@@ -96,7 +96,11 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
add_next = False
final_data = []
table_name = ""
for item in data:
for index, item in enumerate(data):
if item.strip().startswith("--"):
continue
if item.lower().strip() == "end":
final_data[-1] = final_data[-1] + "; end;"
final_item = item
if item.lower().strip().find(label_tablename.lower().strip()+":") != -1:
init_index = item.lower().strip().index(label_tablename.lower().strip()+":")
......@@ -109,12 +113,17 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
final_item = item.strip()
table_name = ""
if final_item.strip()[:2] in comments and ("update " in final_item.lower() or "delete " in final_item.lower()):
trans_index = final_item.lower().index("update")
final_item = final_item[trans_index:]
trans_index = final_item.lower().find("update")
if trans_index != -1:
final_item = final_item[trans_index:]
delete_index = final_item.lower().find("delete")
if delete_index != -1:
final_item = final_item[delete_index:]
final_data.append(final_item)
final_data = [item.replace("\t", "") for item in final_data if item != "" and ("select" in item or
"update" in item or
"delete" in item)]
"delete" in item or
"begin" in item)]
result.append((row[0], final_data))
logger.info(f"Lista de comandos: {result}")
except Exception as e:
......@@ -145,13 +154,16 @@ def select_multiple(command: str) -> Dict[str, Any]:
return response
def create_temp_file(filename_mask: str, file_type: str, tablename: str, timezone: str, pattern: str) -> str:
def create_temp_file(tmp_path: str, filename_mask: str, file_type: str, tablename: str, timezone: str,
pattern: str) -> str:
""" Create an output result as a file with a mask in the name
"""
fullpath = ""
try:
dir_name = str(uuid.uuid4())
path_dir = "/tmp/" + dir_name
if not tmp_path.endswith("/"):
tmp_path += "/"
path_dir = tmp_path + dir_name
os.mkdir(path_dir)
current_datetime = str(datetime_by_tzone(timezone, pattern))
filename_mask = filename_mask.replace("<source_name>", tablename)
......
from airflow.utils.db import provide_session
from sqlalchemy import or_
from airflow.models import XCom
import logging
from typing import List, Tuple
......@@ -7,7 +8,7 @@ logger = logging.getLogger()
def save_commands_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_mask: str, transform_mask: str,
order_delimiter: str) -> None:
procedure_mask: str, order_delimiter: str) -> None:
final_names_xcom = []
try:
for data in dataset:
......@@ -24,6 +25,8 @@ def save_commands_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_ma
name = "SELECT|" + name
elif name.find(transform_mask) != -1:
name = "TRANSFORM|" + name
elif name.find(procedure_mask) != -1:
name = "PROCEDURE|" + name
task.xcom_push(key=name, value=data[1])
final_names_xcom.append(name)
task.xcom_push(key="XCOM-EXTRACTION-NAMES", value=final_names_xcom)
......@@ -31,7 +34,7 @@ def save_commands_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_ma
raise AssertionError(f"Error guardando comandos en variables en xcom. {e}")
def save_names_to_xcom(value: str, task, task_name: str, key: str) -> None:
def save_values_to_xcom(value: str, task, task_name: str, key: str) -> None:
final_names_xcom = []
try:
xcom_names = task.xcom_pull(task_ids=task_name, key=key)
......@@ -50,7 +53,10 @@ def delete_all_xcom_tasks() -> None:
try:
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.task_id == "SCRIPTS-EXTRACTORS").delete()
session.query(XCom).filter(or_(XCom.task_id == "SCRIPTS-EXTRACTORS", XCom.task_id == "EXTRACTORS",
XCom.task_id == "GENERATORS", XCom.task_id == "TRANSFORMATIONS",
XCom.task_id == "VALIDATE_EXTRACTION", XCom.task_id == "VALIDATE_GENERATOR",
XCom.task_id == "VALIDATE_TRANSFORMATION")).delete()
cleanup_xcom()
except Exception as e:
logger.error(f"Error borrando todas las variables xcom del DAG actual. {e}")
......
[{
"date": "2023-07-20 11:15:00",
"status": "success",
"tasks": {
"EXTRACTORS_1": {
"description": "select * from ABC",
"status": "success",
"message": ""
},
"EXTRACTORS_2": {
"description": "select * fromADA",
"status": "success",
"message": ""
},
"TRANSFORMATION_1": {
"description": "1.transformations1.sql",
"status": "success",
"message": ""
},
"GENERATOR_1": {
"description": "ABC_2022-07-20.txt",
"status": "success",
"message": ""
},
"CLEANER_1": {
"description": "ABC",
"status": "success",
"message": ""
}
}
}
]
......@@ -13,7 +13,7 @@ app:
schema: public
transformation:
type: mysql
host: 192.168.1.10
host: 192.168.1.11
port: 13306
username: root
password: root
......@@ -22,19 +22,28 @@ app:
schema:
chunksize: 50000
label_multiple_select: TABLE
source_mask: select
transformation_mask: transform
source_mask: select # Sufijo (S)
procedure_mask: procedure # S
transformation_mask: transform # S
prefix_order_delimiter: .
scripts:
s3_params:
bucket: prueba1234568
prefix: bcom_scripts
connection_id: conn_script
control:
s3_params:
connection_id: conn_script
bucket: prueba1234568
prefix: bcom_control
filename: control_example.json
timezone: 'GMT-5'
outputs:
filename_mask: <source_name>_<datetime>
datetime_pattern: '%Y-%m-%d %H:%M:%S'
file_type: txt
delimiter: '|'
tmp_path: /tmp
s3_params:
bucket: prueba1234568
prefix: bcom_results
......
from datetime import datetime
import time
from typing import Any, Dict
import json
from io import StringIO
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from components.Utils import update_sql_commands
from components.Xcom import save_commands_to_xcom
from components.S3Route import get_files_from_prefix
from components.S3Route import get_files_from_prefix, get_file_from_key
from components.Sensor import create_s3_sensor
from components.Extractor import get_extract_task_group
from components.Transformation import get_transform_task_group
from components.Generation import get_generate_task_group
from components.Cleaning import get_cleaning_task_group
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.Databases.Database import Database
import logging
......@@ -20,9 +22,7 @@ import logging
logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACIONES2"
# TASK_NAMES = {'SENSORS': 'SCRIPTS_SENSORS', 'SCRIPT-EXTRACTION': 'SCRIPTS_EXTRACTOR', 'EXTRACTION': 'ExtraccionDeDatos',
# 'TRANSFORMATION': 'TransformacionDeDatos', ''}
DAG_NAME = "BCOM_DAG_TRANSFORMACIONES3"
DEFAULT_ARGS = {
'owner': 'BCOM',
......@@ -35,54 +35,77 @@ DEFAULT_ARGS = {
}
def cleaning(intern_conn) -> TaskGroup:
def cleaning(intern_conn, control_s3: Dict[str, Any]) -> TaskGroup:
groups = None
try:
groups = get_cleaning_task_group(intern_conn)
groups = get_cleaning_task_group(intern_conn, control_s3)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
return groups
def generate_and_deploy_results(intern_conn, parameters: Dict[str, Any], timezone: str) -> TaskGroup:
def generate_and_deploy_results(intern_conn, parameters: Dict[str, Any], timezone: str,
control_s3: Dict[str, Any]) -> TaskGroup:
groups = None
try:
groups = get_generate_task_group(intern_conn, parameters, timezone)
groups = get_generate_task_group(intern_conn, parameters, control_s3, timezone)
except Exception as e:
logger.error(f"Error general de creación y despliegue de resultados. {e}")
finally:
return groups
def transformation(intern_conn) -> TaskGroup:
def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any]) -> TaskGroup:
groups = None
try:
groups = get_transform_task_group(intern_conn)
groups = get_transform_task_group(intern_conn, timezone, control_s3)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
return groups
def extraction(source_conn, intern_conn, chunksize: int = 100000) -> TaskGroup:
def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, Any],
chunksize: int = 100000) -> TaskGroup:
groups = None
try:
groups = get_extract_task_group(source_conn, intern_conn, chunksize)
groups = get_extract_task_group(source_conn, intern_conn, chunksize, timezone, control_s3)
except Exception as e:
logger.error(f"Error general de extracción de datos. {e}")
finally:
return groups
def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task):
try:
if not prefix.endswith("/"):
prefix += "/"
key = prefix + filename
logger.info(f"EXTRAYENDO ARCHIVO DE CONTROL DESDE {key}")
control = get_file_from_key(conn_id, bucket, key)
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data)
data = json.load(data)
if not data:
print("ACA", data)
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": []}]
task.xcom_push(key="CONTROL-CONFIG", value=data)
except Exception as e:
logger.error(f"Error general de descarga de archivo de control. {e}")
def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str,
order_delimiter: str, label_tablename: str, **kwargs):
order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any],
**kwargs):
try:
extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"],
control_params["filename"], kwargs['ti'])
start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
scripts = get_files_from_prefix(conn_id, bucket, prefix)
scripts = update_sql_commands(scripts, label_tablename)
save_commands_to_xcom(scripts, kwargs['ti'], source_mask, transform_mask, order_delimiter)
save_commands_to_xcom(scripts, kwargs['ti'], source_mask, transform_mask, procedure_mask, order_delimiter)
logger.debug(f"Script cargados en Xcom: {scripts}")
logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos")
except Exception as e:
......@@ -100,7 +123,7 @@ def set_dag():
conf_path = "/root/airflow/dags/dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
print(data)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que extrae y transforma",
schedule_interval=conf["schedule"], tags=["DAG BCOM - SQL TRANSFORMATIONS"], catchup=True) as dag:
......@@ -111,17 +134,22 @@ def set_dag():
wildcard_scripts = scripts_s3["prefix"] + "/?*"
sensor_scripts = create_s3_sensor("SCRIPTS-SENSOR", scripts_s3["connection_id"], scripts_s3["bucket"],
wildcard_scripts)
control_s3 = conf["control"]["s3_params"]
# Scripts extraction
extract_mask = conf["source_mask"]
transform_mask = conf["transformation_mask"]
procedure_mask = conf["procedure_mask"]
order_delimiter = conf["prefix_order_delimiter"]
script_extractor = PythonOperator(
task_id="SCRIPTS-EXTRACTOR",
python_callable=extract_scripts,
op_kwargs={'conn_id': scripts_s3["connection_id"], 'bucket': scripts_s3["bucket"],
'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask,
'order_delimiter': order_delimiter, 'label_tablename': conf["label_multiple_select"]},
'procedure_mask': procedure_mask, 'order_delimiter': order_delimiter,
'label_tablename': conf["label_multiple_select"], 'control_params': control_s3},
trigger_rule="all_success"
)
# Source Database configuration. Only we use 1 source
source_params = conf["database"]["sources"]["source1"]
source_db = Database(source_params["type"], source_params["host"], int(source_params["port"]),
......@@ -138,18 +166,18 @@ def set_dag():
# Creación de grupo de tasks para las extracciones
chunksize = conf["chunksize"]
extractions = extraction(source_db, intern_db, chunksize)
timezone = conf["timezone"]
extractions = extraction(source_db, intern_db, timezone, control_s3, chunksize)
# Creación de grupo de tasks para las transformaciones
transformations = transformation(intern_db)
transformations = transformation(intern_db, timezone, control_s3)
# Creación de grupo de tasks para la generación y despliegue de archivos resultados
outputs_conf = conf["outputs"]
timezone = conf["timezone"]
result = generate_and_deploy_results(intern_db, outputs_conf, timezone)
result = generate_and_deploy_results(intern_db, outputs_conf, timezone, control_s3)
# Creación de tasks de limpiadores
cleaners = cleaning(intern_db)
cleaners = cleaning(intern_db, control_s3)
sensor_scripts >> script_extractor >> extractions >> transformations >> result >> cleaners
return dag
......
from enum import Enum
class ProcessStatusEnum(Enum):
SUCCESS = "success"
FAIL = "failed"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment