Commit 3176aa51 authored by Cristian Aguirre's avatar Cristian Aguirre

Update 30-07-23. Add new 2 DAGS: INFORM_PROCESS and RESET_PROCESS

parent c22cefd9
......@@ -8,7 +8,7 @@ from airflow.decorators import task
from components.Utils import select_multiple
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.S3Route import load_obj_to_s3
from components.S3Route import load_control_to_s3
from enums.OperationTypeEnum import OperationTypeEnum
import logging
......@@ -16,7 +16,7 @@ import logging
logger = logging.getLogger()
def validate_clean(control_params: Dict[str, Any], provider: str, **kwargs) -> None:
def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG")
......@@ -27,7 +27,7 @@ def validate_clean(control_params: Dict[str, Any], provider: str, **kwargs) -> N
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key, provider)
loaded = load_control_to_s3(bytes(conf.encode()), conn, bucket, key, provider, timezone)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
......@@ -58,10 +58,14 @@ def get_cleaners_from_xcom(**kwargs):
final_selects.append(select)
logger.info(f"Final selects: {final_selects}")
Variable.set(key='CLEANS', value=final_selects, serialize_json=True)
return [[item] for item in final_selects]
if len(final_selects) > 0:
return [[item] for item in final_selects]
else:
return [[None]]
def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider: str) -> TaskGroup or None:
def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider: str,
timezone: str) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="LimpiezaDelProceso", prefix_group_id=False) as group:
......@@ -76,7 +80,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider
validate_task = PythonOperator(
task_id="VALIDATE_CLEANER",
python_callable=validate_clean,
op_kwargs={'control_params': control_s3, 'provider': provider},
op_kwargs={'control_params': control_s3, 'provider': provider, 'timezone': timezone},
trigger_rule='none_skipped'
)
cleaners >> tasks >> validate_task
......
from components.Timezone import datetime_by_tzone
from enums.ProcessStatusEnum import ProcessStatusEnum
import logging
from typing import Dict, Any, List, Tuple
import json
from io import StringIO
from typing import Dict, Any, List
from components.S3Route import get_file_from_prefix
logger = logging.getLogger()
def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[str, Any]:
response = {'status': ProcessStatusEnum.SUCCESS.value, 'tasks': []}
response = {'status': ProcessStatusEnum.SUCCESS.value, 'tasks': [], 'reset': False}
try:
conf = conf[-1]
logger.info(f"Último proceso ejecutado: {conf}")
status = conf["status"]
if "reset_by_user" in conf.keys():
response["reset"] = True
if status == ProcessStatusEnum.FAIL.value:
response["status"] = ProcessStatusEnum.FAIL.value
success_tasks = conf["tasks"]
......@@ -33,18 +38,45 @@ def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[s
def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str, Any],
timezone: str, delete_last: bool = False) -> List[Dict[str, Any]]:
timezone: str, task, delete_last: bool = False, frequency: str = "montly") -> List[Dict[str, Any]]:
try:
print("PREV:", conf)
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
current_period = str(datetime_by_tzone(timezone, format_date))[:7]
processed_period = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="PROCESSED_PERIOD")
if delete_last:
print("CONF-TRANS:", conf)
last_tasks = conf[-1]["tasks"]
tasks.update(last_tasks)
print("LAST:", tasks)
conf.pop(-1)
current_datetime = str(datetime_by_tzone(timezone, '%Y-%m-%d %H:%M:%S'))
new_process = {"date": current_datetime, "status": status, "tasks": tasks}
conf.append(new_process)
if current_period == processed_period:
conf.append(new_process)
else:
conf = [new_process]
print("LAS:", conf)
except Exception as e:
logger.error(f"Error actualizando archivo de control. {e}")
finally:
return conf
def extract_last_control(conn_id: str, bucket: str, prefix: str, provider: str, timezone: str, **kwargs):
try:
task = kwargs['ti']
if not prefix.endswith("/"):
prefix += "/"
logger.info(f"BUSCANDO Y EXTRAYENDO ARCHIVO DE CONTROL DESDE {prefix}")
control, control_key = get_file_from_prefix(conn_id, bucket, prefix, provider, timezone, task)
if control:
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data)
if not data:
raise AssertionError("Archivo de control no tiene un formato correcto")
else:
data = json.load(data)
task.xcom_push(key="CONTROL-CONFIG", value=[control_key, data])
else:
raise AssertionError("Archivo de control no encontrado")
except Exception as e:
logger.error(f"Error general de descarga de archivo de control. {e}")
\ No newline at end of file
......@@ -8,7 +8,7 @@ def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool =
final_steps = 0
try:
if is_tablename:
count_command = f'SELECT COUNT(*) FROM "{sql_command}"'
count_command = f'SELECT COUNT(*) FROM {sql_command}'
else:
count_command = f"SELECT COUNT(*) FROM ({sql_command}) BCOM"
with connection.connect() as conn:
......
......@@ -18,13 +18,13 @@ def execute_transformations(commands: List[str], engine):
def delete_table(tablename: str, engine) -> bool:
delete = False
try:
command = f'DROP TABLE "{tablename}"'
command = f'DROP TABLE {tablename}'
start_time = time.time()
with engine.connect() as conn:
try:
_ = conn.execute(command)
except Exception:
logger.error(f"Tabla no encontrada")
except Exception as e:
logger.error(f"Tabla no encontrada. {e}")
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
except Exception as e:
......
......@@ -84,3 +84,12 @@ class Database:
logger.error(f"Error generando comando sql para procedure. Comando: {command}. {e}")
finally:
return response
def get_all_tablenames(self) -> List[str]:
tablenames = []
try:
tablenames = self.factory.get_all_tablenames()
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
from typing import List, Tuple
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from components.Model.InsumoModel import InsumoModel
from components.Databases.Enums.MysqlDataTypeEnum import MysqlDataTypeEnum
from components.Databases.Enums.MysqlDataTypeORMEnum import MysqlDataTypeORMEnum
from sqlalchemy import Table, Column, MetaData
from sqlalchemy import Column
import logging
logger = logging.getLogger()
......@@ -69,3 +70,14 @@ class Mysql:
logger.error(f"Error generando comando sql para procedure Mysql. Comando: {command}. {e}")
finally:
return response
def get_all_tablenames(self) -> List[str]:
tablenames = []
try:
command = f"SELECT table_name FROM information_schema.tables WHERE table_schema='{self.database}'"
with self.engine.connect() as conn:
tablenames = conn.execute(command).all()
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
......@@ -92,3 +92,14 @@ class Oracle:
logger.error(f"Error generando comando sql para procedure Oracle. Comando: {command}. {e}")
finally:
return response
def get_all_tablenames(self) -> List[str]:
tablenames = []
try:
command = f"SELECT table_name FROM all_tables WHERE OWNER='{self.user}'"
with self.engine.connect() as conn:
tablenames = conn.execute(command).all()
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
......@@ -70,3 +70,14 @@ class Postgres:
logger.error(f"Error generando comando sql para procedure Postgres. Comando: {command}. {e}")
finally:
return response
def get_all_tablenames(self) -> List[str]:
tablenames = []
try:
command = f"SELECT pg_tables.tablename FROM pg_catalog.pg_tables WHERE schemaname='{self.schema}'"
with self.engine.connect() as conn:
tablenames = conn.execute(command).all()
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
......@@ -9,7 +9,7 @@ from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_st
from components.DatabaseOperation.DatabaseLoad import save_from_dataframe
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_obj_to_s3
from components.S3Route import load_control_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from enums.OperationTypeEnum import OperationTypeEnum
......@@ -40,7 +40,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider:
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="EXTRACTORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone)
conf = update_new_process(conf, status, final_dict, timezone, ti)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -49,7 +49,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider:
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key, provider)
loaded = load_control_to_s3(bytes(conf.encode()), conn, bucket, key, provider, timezone)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
......@@ -83,7 +83,9 @@ def on_success_extractor(context) -> None:
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def extract_from_source(command: str, source_conn, intern_conn, chunksize: int, **kwargs):
def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwargs):
if isinstance(command, type(None)):
raise AirflowSkipException
task = kwargs['ti']
extract_type = command[0].split("|")[0]
command = command[1]
......@@ -178,11 +180,14 @@ def get_select_from_xcom(**kwargs):
logger.info(f"Trayendo comandos {xcom_selects}")
for select in xcom_selects:
tablename = select_multiple(select)["tablename"]
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or tablename not in success_tasks:
if tasks["reset"] or tasks["status"] == ProcessStatusEnum.SUCCESS.value or tablename not in success_tasks:
final_selects.append((key, select, ))
logger.info(f"Comandos para la extracción: {final_selects}")
Variable.set(key='SELECTS', value=final_selects, serialize_json=True)
return [[item] for item in final_selects]
if len(final_selects) > 0:
return [[item] for item in final_selects]
else:
return [[None]]
def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timezone: str,
......@@ -192,6 +197,12 @@ def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timez
with TaskGroup(group_id="ExtraccionDeDatos", prefix_group_id=False) as group:
selects = get_select_from_xcom()
validate_task = PythonOperator(
task_id="VALIDATE_EXTRACTION",
python_callable=validate_extractor,
op_kwargs={'control_params': control_s3, 'timezone': timezone, 'provider': provider},
trigger_rule='all_done'
)
tasks = PythonOperator.partial(
task_id="EXTRACTORS",
python_callable=extract_from_source,
......@@ -200,12 +211,6 @@ def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timez
on_success_callback=on_success_extractor
).expand(op_args=selects)
validate_task = PythonOperator(
task_id="VALIDATE_EXTRACTION",
python_callable=validate_extractor,
op_kwargs={'control_params': control_s3, 'timezone': timezone, 'provider': provider},
trigger_rule='all_done'
)
selects >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de extracción. {e}")
......
......@@ -8,7 +8,7 @@ from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.S3Route import save_df_to_s3, load_obj_to_s3
from components.S3Route import save_df_to_s3, load_control_to_s3
from components.Utils import select_multiple, create_temp_file, delete_temp_dir
from components.Control import get_tasks_from_control, update_new_process
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
......@@ -37,7 +37,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, provider: s
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, True)
conf = update_new_process(conf, status, final_dict, timezone, ti, True)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -46,7 +46,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, provider: s
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key, provider)
loaded = load_control_to_s3(bytes(conf.encode()), conn, bucket, key, provider, timezone)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
......@@ -60,6 +60,7 @@ def on_failure_generator(context) -> None:
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('GENERATES', default_var=[], deserialize_json=True)
table = selects[ti.map_index]
table = select_multiple(table)["tablename"]
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
task_result = {"description": table, "status": status, "message": exception}
......@@ -72,6 +73,7 @@ def on_success_generator(context) -> None:
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('GENERATES', default_var=[], deserialize_json=True)
table = selects[ti.map_index]
table = select_multiple(table)["tablename"]
status = ProcessStatusEnum.SUCCESS.value
task_result = {"description": table, "status": status, "message": ""}
ti.xcom_push(key=task_name, value=task_result)
......@@ -80,6 +82,8 @@ def on_success_generator(context) -> None:
def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timezone: str,
provider: str, chunksize=10000):
if isinstance(command, type(None)):
raise AirflowSkipException
engine = intern_conn.engine
logger.debug(f"COMANDO: {command}")
tablename = select_multiple(command)["tablename"]
......@@ -137,11 +141,14 @@ def get_generate_from_xcom(**kwargs):
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo tablas {xcom_outputs}")
for select in xcom_outputs:
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or select not in success_tasks:
if tasks["reset"] or tasks["status"] == ProcessStatusEnum.SUCCESS.value or select not in success_tasks:
final_outputs.append(select)
logger.info(f"Final outputs: {final_outputs}")
Variable.set(key='GENERATES', value=final_outputs, serialize_json=True)
return [[item] for item in final_outputs]
if len(final_outputs) > 0:
return [[item] for item in final_outputs]
else:
return [[None]]
def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_s3: Dict[str, Any],
......
......@@ -9,6 +9,7 @@ from components.Utils import get_type_file
from enums.FileTypeEnum import FileTypeEnum
from enums.ScriptFileTypeEnum import ScriptFileTypeEnum
from enums.ProviderTypeEnum import ProviderTypeEnum
from components.Timezone import datetime_by_tzone
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
......@@ -176,35 +177,94 @@ def get_files_from_prefix(conn: str, bucket: str, prefix: str, provider: str) ->
return result
def get_file_from_key(conn: str, bucket: str, key: str, provider: str) -> Any:
result = BytesIO()
def get_file_from_prefix(conn: str, bucket: str, key: str, provider: str, timezone: str, task,
frequency: str = "montly") -> Any:
result, key_result = BytesIO(), ''
try:
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
period = str(datetime_by_tzone(timezone, format_date))[:7]
logger.info(f"Periodo actual: {period}.")
files, file = [], key
cloud_gcp = False
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
s3_hook = S3Hook(conn)
data = s3_hook.get_key(key, bucket)
data.download_fileobj(result)
files = s3_hook.list_keys(bucket, key)
elif provider == ProviderTypeEnum.GOOGLE.value:
cloud_gcp = True
gcp_hook = GoogleCloudStorageHook(conn)
result = gcp_hook.download(bucket, key)
print("RESULT:", result)
if not key.endswith("/"):
key += "/"
files = gcp_hook.list(bucket, prefix=key)
files_with_period = []
for file in files:
if file.endswith("/"):
continue
file_period = file[file.rfind("_") + 1:file.rfind(".")]
files_with_period.append((file, file_period))
files_with_period.sort(key=lambda x: x[1])
if len(files_with_period) > 0:
file = files_with_period[-1][0]
task.xcom_push(key="PROCESSED_PERIOD", value=files_with_period[-1][1])
logger.info(f"Descargando archivo de control: {file}")
if file != key:
if cloud_gcp:
result = gcp_hook.download(bucket, file)
result = BytesIO(result)
else:
data = s3_hook.get_key(file, bucket)
data.download_fileobj(result)
key_result = file
except Exception as e:
result = None
logger.error(f"Error extrayendo archivo {key}. {e}")
finally:
return result
return result, key_result
def load_control_to_s3(obj, conn: str, bucket: str, key: str, provider: str, timezone: str,
frequency: str = "montly") -> bool:
load = False
try:
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
period = str(datetime_by_tzone(timezone, format_date))[:7]
key = key.replace("<period>", period)
load = upload_file(obj, conn, bucket, key, provider)
except Exception as e:
logger.error(f"Error subiendo archivo de control a bucket {bucket} y key {key}. {e}")
finally:
return load
def load_obj_to_s3(obj, conn: str, bucket: str, key: str, provider: str, replace=True) -> bool:
def load_report_to_s3(conn: str, bucket: str, key: str, filename: str, provider: str, timezone: str,
pattern: str) -> bool:
load = False
try:
current_datetime = str(datetime_by_tzone(timezone, pattern))
key = key.replace("<datetime>", current_datetime)
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
s3_hook = S3Hook(conn)
s3_hook.load_bytes(obj, key, bucket, replace)
s3_hook.load_file(filename, key, bucket, True)
elif provider == ProviderTypeEnum.GOOGLE.value:
gcp_hook = GoogleCloudStorageHook(conn)
gcp_hook.upload(bucket, key, data=obj)
gcp_hook.upload(bucket, key, filename)
load = True
except Exception as e:
logger.error(f"Error subiendo archivo de control a bucket {bucket} y key {key}. {e}")
logger.error(f"Error subiendo reporte a bucket {bucket} y key {key}. {e}")
finally:
return load
def upload_file(obj, connection: str, bucket: str, key: str, provider: str) -> bool:
upload = False
try:
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
s3_hook = S3Hook(connection)
s3_hook.load_bytes(obj, key, bucket, True)
elif provider == ProviderTypeEnum.GOOGLE.value:
gcp_hook = GoogleCloudStorageHook(connection)
gcp_hook.upload(bucket, key, data=obj)
upload = True
except Exception as e:
logger.error(f"Error subiendo archivo a {provider}. bucket: {bucket}, key: {key}. {e}")
finally:
return upload
......@@ -7,7 +7,7 @@ import logging
logger = logging.getLogger()
def datetime_by_tzone(tzone: str, pattern: str):
def datetime_by_tzone(tzone: str, pattern: str = "%Y-%m-%d"):
offset = None
# Algunos casos donde el timezone es de la forma 4:30 y no se encuentra en timezones de pytz (GMT)
if ":" in tzone:
......
......@@ -7,7 +7,7 @@ from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_obj_to_s3
from components.S3Route import load_control_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from enums.ProcessStatusEnum import ProcessStatusEnum
from enums.OperationTypeEnum import OperationTypeEnum
......@@ -34,7 +34,7 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, provider:
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="TRANSFORMATIONS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, True)
conf = update_new_process(conf, status, final_dict, timezone, ti, True)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -43,7 +43,7 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, provider:
prefix += "/"
key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key, provider)
loaded = load_control_to_s3(bytes(conf.encode()), conn, bucket, key, provider, timezone)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
......@@ -76,6 +76,8 @@ def on_success_transform(context) -> None:
def transformations(xcom_commands: str, intern_conn):
if isinstance(xcom_commands, type(None)):
raise AirflowSkipException
engine = intern_conn.engine
script_name = xcom_commands[0]
commands = xcom_commands[1]
......@@ -103,13 +105,16 @@ def get_trans_from_xcom(**kwargs):
continue
xcom_transforms = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_transforms}")
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or key not in success_tasks:
if tasks["reset"] or tasks["status"] == ProcessStatusEnum.SUCCESS.value or key not in success_tasks:
for transform in xcom_transforms:
final_transforms.append(transform)
transforms_per_file.append((key, final_transforms))
logger.info(f"Scripts para la transformación: {transforms_per_file}")
Variable.set(key='TRANSFORMS', value=transforms_per_file, serialize_json=True)
return [[item] for item in transforms_per_file]
if len(transforms_per_file) > 0:
return [[item] for item in transforms_per_file]
else:
return [[None]]
def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str, Any],
......
......@@ -173,7 +173,7 @@ def create_temp_file(tmp_path: str, filename_mask: str, file_type: str, tablenam
if not tmp_path.endswith("/"):
tmp_path += "/"
path_dir = tmp_path + dir_name
os.mkdir(path_dir)
os.makedirs(path_dir)
current_datetime = str(datetime_by_tzone(timezone, pattern))
filename_mask = filename_mask.replace("<source_name>", tablename)
filename_mask = filename_mask.replace("<datetime>", current_datetime)
......@@ -229,3 +229,21 @@ def generateModel(tablename: str, attributes: List[Dict[str, Any]], indexes: Lis
logger.error(f"Error creando modelo dinámico. {e}")
finally:
return model
def delete_temp_dirs(tmp_dir: str) -> bool:
delete = False
try:
dirs = list(os.listdir(tmp_dir))
for directory in dirs:
full_path = tmp_dir + "/" + directory
if os.path.isdir(full_path):
_ = delete_temp_dir(full_path)
logger.debug(f"Se borró el directorio {directory}")
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
delete = True
except Exception as e:
logger.error(f"Error borrando archivos temporales en {tmp_dir}. {e}")
finally:
return delete
......@@ -55,13 +55,14 @@ def delete_all_xcom_tasks() -> None:
try:
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(or_(XCom.task_id == "SCRIPTS-EXTRACTORS", XCom.task_id == "EXTRACTORS",
session.query(XCom).filter(or_(XCom.task_id == "SCRIPTS-EXTRACTOR", XCom.task_id == "EXTRACTORS",
XCom.task_id == "GENERATORS", XCom.task_id == "TRANSFORMATIONS",
XCom.task_id == "VALIDATE_EXTRACTION", XCom.task_id == "VALIDATE_GENERATOR",
XCom.task_id == "VALIDATE_TRANSFORMATION")).delete()
XCom.task_id == "VALIDATE_TRANSFORMATION", XCom.task_id == "CONTROL-EXTRACTOR")).delete()
session.query(Variable).filter(or_(Variable.key == "SELECTS", Variable.key == "TRANSFORMS",
Variable.key == "GENERATES", Variable.key == "CLEANS")).delete()
delete_task_instances()
session.query(XCom).filter(XCom.dag_id == "BCOM_DAG_TRANSFORMACIONES4").delete()
cleanup_xcom()
except Exception as e:
logger.error(f"Error borrando todas las variables xcom del DAG actual. {e}")
......
from datetime import datetime
from typing import Any, Dict
from airflow import DAG
import pandas as pd
import os
import uuid
from airflow.operators.python import PythonOperator
from components.Control import extract_last_control
from components.S3Route import load_report_to_s3
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.Utils import delete_temp_dir
import logging
logger = logging.getLogger()
DAG_NAME = "INFORM_PROCESS"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
DEFAULT_ARGS = {
'owner': 'BCOM',
"start_date": datetime(2023, 7, 29, 22, 9),
'depends_on_past': False,
'email': 'caguirre@bytesw.com',
'retries': 0,
'email_on_retry': False,
'email_on_failure': False
}
def upload_report(report_params: Dict[str, Any], provider: str, timezone: str, **kwargs) -> None:
try:
task = kwargs["ti"]
report_path = task.xcom_pull(task_ids="CREATE-REPORT", key="REPORT_PATH")
pattern = report_params["datetime_pattern"]
conn = report_params["s3_params"]["connection_id"]
bucket = report_params["s3_params"]["bucket"]
prefix = report_params["s3_params"]["prefix"]
key = report_params["s3_params"]["filename"]
if not prefix.endswith("/"):
key = prefix + "/" + key
else:
key = prefix + key
upload = load_report_to_s3(conn, bucket, key, report_path, provider, timezone, pattern)
if upload:
logger.info(f"Se subio correctamente el reporte a {key}. bucket: {bucket}")
delete_temp_dir(report_path)
except Exception as e:
logger.error(f"Error subiendo reporte a . {e}")
def create_report(tmp_path: str, **kwargs) -> None:
try:
task = kwargs["ti"]
data = task.xcom_pull(task_ids="GET-DATA-REPORT", key="REPORT-DATA")
status, execution_date = data["PROCESS_STATUS"], data["PROCESS_EXECUTION"]
_, _ = data.pop("PROCESS_STATUS"), data.pop("PROCESS_EXECUTION")
dir_name = str(uuid.uuid4())
if not tmp_path.endswith("/"):
tmp_path += "/"
path_dir = tmp_path + dir_name
os.makedirs(path_dir)
excel_tmp_path = path_dir + "/tmp_excel.xlsx"
with pd.ExcelWriter(excel_tmp_path, engine="xlsxwriter") as writer:
workbook = writer.book
worksheet = workbook.add_worksheet("report")
worksheet.set_zoom(90)
title = "Reporte de último proceso ejecutado"
title_format = workbook.add_format()
title_format.set_font_size(20)
title_format.set_font_color("#333333")
header = f"Reporte ejecutado el día {execution_date}"
if status == ProcessStatusEnum.SUCCESS.value:
status = "EXITOSO"
elif status == ProcessStatusEnum.FAIL.value:
status = "FALLIDO"
elif status == ProcessStatusEnum.RESET.value:
status = "RESETEADO POR EL USUARIO"
status = f"Estado de último proceso ejecutado: {status}"
header_format = workbook.add_format()
header_format.set_font_size(10)
header_format.set_font_color("#080606")
worksheet.merge_range('A1:N1', title, title_format)
worksheet.merge_range('A2:N2', header, header_format)
worksheet.merge_range('A3:N3', status, header_format)
row_format = workbook.add_format()
row_format.set_font_size(8)
row_format.set_font_color("#000000")
base_index = 5
for index, key in enumerate(data.keys()):
index = base_index + index
worksheet.merge_range('A'+str(index)+':B'+str(index), key, row_format)
if data[key]["TYPE"] == "EXTRACTION":
worksheet.merge_range('C'+str(index)+':G'+str(index), f"TABLA DE EXTRACCIÓN: {data[key]['DESCRIPTION']}", row_format)
elif data[key]["TYPE"] == "TRANSFORMATION":
script = data[key]["DESCRIPTION"].split("|")[1]
worksheet.merge_range('C'+str(index)+':G'+str(index), f"SCRIPT DE TRANSFORMACIÓN: {script}", row_format)
elif data[key]["TYPE"] == "GENERATION":
worksheet.merge_range('C'+str(index)+':G'+str(index), f"ARCHIVO GENERADO DESDE LA TABLA: {data[key]['DESCRIPTION']}", row_format)
worksheet.merge_range('H'+str(index)+':I'+str(index), f"ESTADO: {data[key]['STATUS']}", row_format)
worksheet.merge_range('J'+str(index)+':N'+str(index), data[key]['MESSAGE'], row_format)
task.xcom_push(key="REPORT_PATH", value=excel_tmp_path)
except Exception as e:
logger.error(f"Error creando reporte. {e}")
def get_data_report(**kwargs) -> None:
try:
report_data = {}
task = kwargs['ti']
control_config = task.xcom_pull(task_ids="CONTROL-EXTRACTOR", key="CONTROL-CONFIG")
control_key, control = control_config[0], control_config[1]
if not control:
logger.info("Archivo de control no encontrado. No se actualizará ningún archivo de control")
else:
last_process = control[-1]
if "reset_by_user" in last_process.keys():
report_data["PROCESS_EXECUTION"] = ProcessStatusEnum.RESET.value
else:
total_tasks = [last_process["tasks"]]
current_status = last_process["status"]
control.reverse()
for process in control:
if process["status"] == ProcessStatusEnum.SUCCESS.value:
break
total_tasks.append(process["tasks"])
final_key_tasks, final_key_desc, final_key_message = {}, {}, {}
for tasks in total_tasks:
for key in tasks.keys():
this_status = tasks[key]["status"]
this_desc = tasks[key]["description"]
this_message = tasks[key]["message"]
if key in final_key_tasks.keys():
task_status = final_key_tasks[key]
if this_status == ProcessStatusEnum.SUCCESS.value and \
task_status == ProcessStatusEnum.FAIL.value:
final_key_tasks.update({key: this_status})
final_key_desc.update({key: this_desc})
final_key_message.update({key: ''})
else:
final_key_tasks.update({key: this_status})
final_key_desc.update({key: this_desc})
final_key_message.update({key: this_message})
for item in final_key_tasks.keys():
if item.lower().startswith("extract"):
type_task = "EXTRACTION"
elif item.lower().startswith("transform"):
type_task = "TRANSFORMATION"
else:
type_task = "GENERATION"
report_data.update({item: {"STATUS": final_key_tasks[item], "TYPE": type_task,
"DESCRIPTION": final_key_desc[item], 'MESSAGE': final_key_message[item]}})
report_data.update({"PROCESS_STATUS": current_status, "PROCESS_EXECUTION": last_process["date"]})
task.xcom_push(key="REPORT-DATA", value=report_data)
logger.info(f"Diccionario de datos para el reporte: {report_data}")
except Exception as e:
logger.error(f"Error general creando reporte. {e}")
def set_dag():
""" DAG that reset the last process Airflow have"""
import yaml
from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que informa del último proceso ejecutado",
schedule_interval=conf["inform_dag_schedule"], tags=["DAG BCOM - INFORM PROCESS"], catchup=True) as dag:
control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"]
control_extractor = PythonOperator(
task_id="CONTROL-EXTRACTOR",
python_callable=extract_last_control,
op_kwargs={'conn_id': control_s3["connection_id"], 'bucket': control_s3["bucket"],
'prefix': control_s3["prefix"], 'provider': conf["cloud_provider"], 'timezone': timezone},
trigger_rule="all_success"
)
create = PythonOperator(
task_id="GET-DATA-REPORT",
python_callable=get_data_report,
op_kwargs={},
trigger_rule="all_success"
)
tmp_dir = conf["outputs"]["tmp_path"]
report = PythonOperator(
task_id="CREATE-REPORT",
python_callable=create_report,
op_kwargs={'tmp_path': tmp_dir},
trigger_rule="all_success"
)
report_params = conf["report"]
upload = PythonOperator(
task_id="UPLOAD-REPORT",
python_callable=upload_report,
op_kwargs={'report_params': report_params, 'provider': conf["cloud_provider"], 'timezone': timezone},
trigger_rule="all_success"
)
control_extractor >> create >> report >> upload
return dag
globals()["0"] = set_dag()
from datetime import datetime
from typing import Any, Dict
from airflow import DAG
import json
from airflow.operators.python import PythonOperator
from components.Databases.Database import Database
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.S3Route import upload_file
from components.Control import extract_last_control
from components.Utils import delete_temp_dirs
import logging
logger = logging.getLogger()
DAG_NAME = "RESET_PROCESS"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
DEFAULT_ARGS = {
'owner': 'BCOM',
"start_date": datetime(2023, 7, 28, 22, 9),
'depends_on_past': False,
'email': 'caguirre@bytesw.com',
'retries': 0,
'email_on_retry': False,
'email_on_failure': False
}
def update_control(control_params: Dict[str, Any], provider: str, **kwargs) -> None:
try:
task = kwargs['ti']
control_config = task.xcom_pull(task_ids="CONTROL-EXTRACTOR", key="CONTROL-CONFIG")
control_key, control = control_config[0], control_config[1]
if not control:
logger.info("Archivo de control no encontrado. No se actualizará ningún archivo de control")
else:
last_process = control[-1]
last_process.update({'reset_by_user': True})
control.pop(-1)
control.append(last_process)
control = json.dumps(control).encode('utf-8')
load = upload_file(control, control_params["connection_id"], control_params["bucket"],
control_key, provider)
if load:
logger.info(f"Subido correctamente el archivo de control en bucket {control_params['bucket']} y"
f"key: {control_key}")
# Borrar las variables que hayan quedado
delete_all_xcom_tasks()
delete_task_instances()
except Exception as e:
logger.error(f"Error actualizando archivo de control. {e}")
def reset_process(intern_db, output_tmp_dir: str) -> None:
try:
# Borrrando tablas
tablenames = intern_db.get_all_tablenames()
if len(tablenames) == 0:
logger.info("No se encontraron tablas para su limpieza")
else:
for tablename in tablenames:
tablename = tablename[0]
delete = delete_table(tablename, intern_db.engine)
if delete:
logger.info(f"Borrado correctamente la tabla {tablename}")
# Borrando archivos temporales que hayan quedado sueltos
delete = delete_temp_dirs(output_tmp_dir)
if delete:
logger.info("Se borraron todos los archivos temporales")
except Exception as e:
logger.error(f"Error procesando archivo de control. {e}")
def set_dag():
""" DAG that reset the last process Airflow have"""
import yaml
from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que resetea el último proceso ejecutado",
schedule_interval=conf["reset_dag_schedule"], tags=["DAG BCOM - RESET PROCESS"], catchup=True) as dag:
control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"]
control_extractor = PythonOperator(
task_id="CONTROL-EXTRACTOR",
python_callable=extract_last_control,
op_kwargs={'conn_id': control_s3["connection_id"], 'bucket': control_s3["bucket"],
'prefix': control_s3["prefix"], 'provider': conf["cloud_provider"], 'timezone': timezone},
trigger_rule="all_success"
)
# Intern Database configuration.
intern_params = conf["database"]["transformation"]
intern_db = Database(intern_params["type"], intern_params["host"], int(intern_params["port"]),
intern_params["username"], intern_params["password"], intern_params["database"],
intern_params["service"], intern_params["schema"])
intern_db.create_engine()
tmp_dir = conf["outputs"]["tmp_path"]
control_process = PythonOperator(
task_id="RESET-PROCESS",
python_callable=reset_process,
op_kwargs={'intern_db': intern_db, 'output_tmp_dir': tmp_dir},
trigger_rule="all_success"
)
update = PythonOperator(
task_id="UPDATE-CONTROL",
python_callable=update_control,
op_kwargs={'control_params': control_s3, 'provider': conf["cloud_provider"]},
trigger_rule="all_success"
)
control_extractor >> control_process >> update
return dag
globals()["0"] = set_dag()
......@@ -8,7 +8,7 @@ from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from components.Utils import update_sql_commands
from components.Xcom import save_commands_to_xcom
from components.S3Route import get_files_from_prefix, get_file_from_key
from components.S3Route import get_files_from_prefix, get_file_from_prefix
from components.Sensor import create_sensor
from components.Extractor import get_extract_task_group
from components.Transformation import get_transform_task_group
......@@ -22,10 +22,10 @@ import logging
logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACIONES3"
DAG_NAME = "BCOM_DAG_TRANSFORMACIONES4"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/opt/airflow/dags/"
MAIN_PATH = "/root/airflow/dags/"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition2.json"
DEFAULT_ARGS = {
......@@ -39,10 +39,10 @@ DEFAULT_ARGS = {
}
def cleaning(intern_conn, control_s3: Dict[str, Any], provider: str) -> TaskGroup:
def cleaning(intern_conn, control_s3: Dict[str, Any], provider: str, timezone: str) -> TaskGroup:
groups = None
try:
groups = get_cleaning_task_group(intern_conn, control_s3, provider)
groups = get_cleaning_task_group(intern_conn, control_s3, provider, timezone)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
......@@ -92,20 +92,22 @@ def save_procedure_json(json_path: str, task) -> None:
logger.error(f"Error leyendo y guardando archivo descriptor de procedure. {e}")
def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task, provider: str):
def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str, timezone: str):
try:
if not prefix.endswith("/"):
prefix += "/"
key = prefix + filename
logger.info(f"EXTRAYENDO ARCHIVO DE CONTROL DESDE {key}")
control = get_file_from_key(conn_id, bucket, key, provider)
logger.info(f"BUSCANDO Y EXTRAYENDO ARCHIVO DE CONTROL DESDE {prefix}")
control, _ = get_file_from_prefix(conn_id, bucket, prefix, provider, timezone, task)
if control:
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data)
data = json.load(data)
if not data:
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}]
else:
data = json.load(data)
else:
logger.info(f"Json Procedure descargado: {control}")
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": []}]
logger.info(f"Json de control creado: {control}")
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}]
task.xcom_push(key="CONTROL-CONFIG", value=data)
except Exception as e:
logger.error(f"Error general de descarga de archivo de control. {e}")
......@@ -113,10 +115,10 @@ def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task,
def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str,
order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any],
provider: str, **kwargs):
provider: str, timezone: str, **kwargs):
try:
extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"],
control_params["filename"], kwargs['ti'], provider)
kwargs['ti'], provider, timezone)
save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti'])
start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
......@@ -152,6 +154,7 @@ def set_dag():
sensor_scripts = create_sensor("SCRIPTS-SENSOR", scripts_s3["connection_id"], scripts_s3["bucket"],
wildcard_scripts, conf["cloud_provider"])
control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"]
# Scripts extraction
extract_mask = conf["source_mask"]
transform_mask = conf["transformation_mask"]
......@@ -164,7 +167,7 @@ def set_dag():
'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask,
'procedure_mask': procedure_mask, 'order_delimiter': order_delimiter,
'label_tablename': conf["label_multiple_select"], 'control_params': control_s3,
'provider': conf["cloud_provider"]},
'provider': conf["cloud_provider"], 'timezone': timezone},
trigger_rule="all_success"
)
......@@ -184,7 +187,6 @@ def set_dag():
# Creación de grupo de tasks para las extracciones
chunksize = conf["chunksize"]
timezone = conf["timezone"]
extractions = extraction(source_db, intern_db, timezone, control_s3, conf["cloud_provider"], chunksize)
# Creación de grupo de tasks para las transformaciones
......@@ -195,7 +197,7 @@ def set_dag():
result = generate_and_deploy_results(intern_db, outputs_conf, timezone, control_s3, conf["cloud_provider"])
# Creación de tasks de limpiadores
cleaners = cleaning(intern_db, control_s3, conf["cloud_provider"])
cleaners = cleaning(intern_db, control_s3, conf["cloud_provider"], timezone)
sensor_scripts >> script_extractor >> extractions >> transformations >> result >> cleaners
return dag
......
......@@ -310,5 +310,5 @@ def set_dag_1():
return dag
globals()["0"] = set_dag_1()
# globals()["0"] = set_dag_1()
import enum
from sqlalchemy import Integer, String, Date, DateTime, DECIMAL, BOOLEAN, TEXT
from sqlalchemy import Integer, String, Date, DateTime, DECIMAL, BOOLEAN, Text
class DataTypeOrmEnum(enum.Enum):
......@@ -9,4 +9,4 @@ class DataTypeOrmEnum(enum.Enum):
DATE = Date
DATETIME = DateTime
BOOLEAN = BOOLEAN
LONGTEXT = TEXT
LONGTEXT = Text
......@@ -4,3 +4,4 @@ from enum import Enum
class ProcessStatusEnum(Enum):
SUCCESS = "success"
FAIL = "failed"
RESET = "reset"
[
{
"identifier": "TABLA1",
"identifier": "tabla2",
"fields": [
{
"name": "columna1",
"name": "column1",
"datatype": "TEXT",
"maxLength": 50
"maxLength": 100
},
{
"name": "columna2",
"name": "column2",
"datatype": "NUMBER"
},
{
"name": "columna3",
"datatype": "BOOLEAN"
"name": "column3",
"datatype": "NUMBER"
},
{
"name": "columna4",
"datatype": "NUMBER"
"name": "column4",
"datatype": "DATE"
},
{
"name": "columna5",
"datatype": "DECIMAL"
"name": "column5",
"datatype": "DECIMAL",
"decimal_precision": 5
}
]
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment