Commit b1377db9 authored by Cristian Aguirre's avatar Cristian Aguirre

Update 04-07-23. Update DAG process. Initial process completed

parent 39a51800
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from components.Utils import select_multiple
from components.Xcom import delete_all_xcom_tasks
from components.DatabaseOperation.DatabaseTransformation import delete_table
import logging
logger = logging.getLogger()
def clean(command: str, intern_conn):
engine = intern_conn.engine
tablename = select_multiple(command)["tablename"]
logger.info(f"Borrando tabla {tablename}")
delete = delete_table(tablename, engine)
if delete:
logger.info(f"Borrado correctamente la tabla {tablename}")
delete_all_xcom_tasks()
logger.info(f"Borrado todas las variables xcom")
def get_generate_from_xcom(**kwargs):
final_selects = []
task = kwargs['ti']
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith("SELECT"):
continue
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_outputs}")
for select in xcom_outputs:
final_selects.append(select)
Variable.set(key='CLEANS', value=final_selects, serialize_json=True)
def get_cleaning_task_group(db_intern_conn) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="LimpiezaDelProceso", prefix_group_id=False) as group:
init_task = PythonOperator(
task_id="MASTER_CLEANING",
python_callable=get_generate_from_xcom,
trigger_rule='all_success'
)
cleaners = Variable.get('CLEANS', default_var=[], deserialize_json=True)
if cleaners:
tasks = PythonOperator.partial(
task_id="CLEANERS",
python_callable=clean,
op_kwargs={'intern_conn': db_intern_conn}
).expand(op_args=[[item] for item in cleaners])
init_task >> tasks
except Exception as e:
logger.error(f"Error creando taskGroup de limpiadores. {e}")
finally:
return group
import pandas as pd
import logging
logger = logging.getLogger()
def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool = False) -> int:
final_steps = 0
try:
if is_tablename:
count_command = f"SELECT COUNT(*) FROM {sql_command}"
else:
count_command = f"SELECT COUNT(*) FROM ({sql_command}) BCOM"
with connection.connect() as conn:
result = conn.execute(count_command).first()
if result:
total_rows = int(result[0])
logger.info(f"Total de filas: {total_rows}")
steps = int(total_rows/chunksize)
if steps >= final_steps:
final_steps = steps + 1
except Exception as e:
logger.error(f"Error calculando el total de N° de filas desde el comando: {sql_command}. {e}")
finally:
return final_steps
def get_iterator(command: str, chunksize: int, connection) -> iter:
iterator = None
try:
connection = connection.execution_options(stream_results=True)
iterator = pd.read_sql(command, connection, index_col=None, chunksize=chunksize)
iterator = iter(iterator)
except Exception as e:
logger.error(f"Error trayendo iterator. {e}")
finally:
return iterator
import pandas as pd
import logging
logger = logging.getLogger()
def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
save = True
try:
with connection.connect() as conn:
df.to_sql(tablename, conn, if_exists='append', index=False, chunksize=500)
except Exception as e:
logger.error(f"Error guardando resultados desde dataframe. {e}")
finally:
return save
import logging
import time
from typing import List
logger = logging.getLogger()
def execute_transformations(commands: List[str], engine):
try:
with engine.connect() as connection:
for command in commands:
logger.debug(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
except Exception as e:
logger.error(f"Error ejecutando comando de transformación. {e}")
def delete_table(tablename: str, engine) -> bool:
delete = False
try:
command = f"DROP TABLE {tablename}"
start_time = time.time()
with engine.connect() as conn:
try:
_ = conn.execute(command)
except Exception:
logger.error(f"Tabla no encontrada")
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
except Exception as e:
logger.error(f"Error borrando tabla {tablename}. {e}")
finally:
return delete
from typing import List, Tuple from typing import List, Tuple
from sqlalchemy import Table, Column, MetaData from sqlalchemy import Table, Column, MetaData
from components.Model.InsumoModel import InsumoModel
from components.Databases.Mysql import Mysql from components.Databases.Mysql import Mysql
from components.Databases.Mariadb import Mariadb from components.Databases.Mariadb import Mariadb
from components.Databases.Postgres import Postgres from components.Databases.Postgres import Postgres
......
from enum import Enum from enum import Enum
from sqlalchemy import Integer, String, Float, Date, TIMESTAMP, Numeric, CHAR, Text, Boolean, BLOB, JSON, BigInteger, \ from sqlalchemy import INTEGER, String, Float, DATE, TIMESTAMP, NUMERIC, CHAR, TEXT, BLOB, JSON, BIGINT, \
DateTime, Time, DECIMAL DATETIME, TIME, DECIMAL
class MysqlDataTypeORMEnum(Enum): class MysqlDataTypeORMEnum(Enum):
VARCHAR = String VARCHAR = String
TINY_INT = Integer TINY_INT = INTEGER
INT = Integer INT = INTEGER
TEXT = Text TEXT = TEXT
DECIMAL = DECIMAL DECIMAL = DECIMAL
SHORT = String SHORT = String
TIMESTAMP = TIMESTAMP TIMESTAMP = TIMESTAMP
JSON = JSON JSON = JSON
BIGINT = BigInteger BIGINT = BIGINT
FLOAT = Float FLOAT = NUMERIC
DATETIME = DateTime DATETIME = DATETIME
DATE = Date DATE = DATE
TIME = Time TIME = TIME
DOUBLE = Float DOUBLE = NUMERIC
...@@ -5,6 +5,7 @@ class OracleDataTypeEnum(Enum): ...@@ -5,6 +5,7 @@ class OracleDataTypeEnum(Enum):
NUMBER = "DB_TYPE_NUMBER" NUMBER = "DB_TYPE_NUMBER"
VARCHAR2 = "DB_TYPE_VARCHAR" VARCHAR2 = "DB_TYPE_VARCHAR"
DATE = "DB_TYPE_DATE" DATE = "DB_TYPE_DATE"
TIMESTAMP = "DB_TYPE_TIMESTAMP"
......
from enum import Enum from enum import Enum
from sqlalchemy import Integer, String, Float, Date, TIMESTAMP, Numeric, CHAR, Text, Boolean, BLOB from sqlalchemy import Integer, String, Float, Date, TIMESTAMP, NUMERIC, CHAR, Text, Boolean, BLOB
class OracleDataTypeORMEnum(Enum): class OracleDataTypeORMEnum(Enum):
NUMBER = Integer NUMBER = NUMERIC
VARCHAR2 = String VARCHAR2 = String
DATE = Date DATE = Date
TIMESTAMP = TIMESTAMP
from enum import Enum from enum import Enum
from sqlalchemy import Integer, String, Float, Date, TIMESTAMP, Numeric, CHAR, Text, Boolean, BLOB from sqlalchemy import Integer, String, Float, Date, TIMESTAMP, NUMERIC, CHAR, Text, Boolean, BLOB
class PostgresDataTypeORMEnum(Enum): class PostgresDataTypeORMEnum(Enum):
VARCHAR = String VARCHAR = String
INTEGER = Integer INTEGER = Integer
NUMERIC = Numeric NUMERIC = NUMERIC
TIMESTAMP = TIMESTAMP TIMESTAMP = TIMESTAMP
TEXT = Text TEXT = Text
BOOLEAN = Boolean BOOLEAN = Boolean
BYTEA = BLOB BYTEA = BLOB
CHAR = CHAR CHAR = CHAR
FLOAT = Float FLOAT = NUMERIC
DATE = Date DATE = Date
INTEGER8 = Integer INTEGER8 = Integer
......
...@@ -44,6 +44,9 @@ class Mysql: ...@@ -44,6 +44,9 @@ class Mysql:
if field[2] != -1: if field[2] != -1:
size = int(field[2] / 4) size = int(field[2] / 4)
try: try:
if not isinstance(field[3], type(None)) and field[3] > 0:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value(precision=field[2], scale=field[3])
else:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value(size) data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value(size)
except TypeError: except TypeError:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value() data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value()
......
...@@ -40,17 +40,31 @@ class Oracle: ...@@ -40,17 +40,31 @@ class Oracle:
for field in fields: for field in fields:
logger.info(f"Attribute: {field}") logger.info(f"Attribute: {field}")
name = field[0] name = field[0]
# Default precision for Integer Oracle : 38
if not isinstance(field[2], type(None)): if not isinstance(field[2], type(None)):
size = int(field[2] / 4) size = int(field[2] / 4)
try: try:
if not isinstance(field[3], type(None)) and field[3] > 0:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(precision=field[3], scale=field[3])
elif not isinstance(field[3], type(None)):
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(precision=38, scale=field[3])
else:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(size) data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(size)
except TypeError: except TypeError:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value() data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value()
elif not isinstance(field[3], type(None)) and field[3] < 0:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(precision=38, scale=16)
elif not isinstance(field[3], type(None)) and field[3] >= 0:
try:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(precision=38)
except TypeError:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value()
else: else:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value() data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value()
setattr(model, name, Column(data_type)) setattr(model, name, Column(data_type))
model = model.__table__ model = model.__table__
except Exception as e: except Exception as e:
logger.error(f"Error creando modelo dinámico en Oracle con nombre {tablename}. {e}") logger.error(f"Error creando modelo dinámico en Oracle con nombre {tablename}. {type(e)}.{e}")
finally: finally:
return model return model
...@@ -44,6 +44,9 @@ class Postgres: ...@@ -44,6 +44,9 @@ class Postgres:
name = field[0] name = field[0]
if field[2] != -1: if field[2] != -1:
try: try:
if not isinstance(field[3], type(None)) and field[3] > 0:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(precision=field[2], scale=field[3])
else:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(field[2]) data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(field[2])
except TypeError: except TypeError:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value() data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value()
......
from typing import List, Tuple
from enums.DatabaseTypeEnum import DatabaseTypeEnum from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.Utils import select_multiple from components.Utils import select_multiple
from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps
from components.DatabaseOperation.DatabaseLoad import save_from_dataframe
from airflow.utils.task_group import TaskGroup from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
...@@ -11,63 +11,77 @@ import logging ...@@ -11,63 +11,77 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
def extract_from_source(command: str, source_conn, intern_conn): def extract_from_source(command: str, source_conn, intern_conn, chunksize: int, **kwargs):
engine = source_conn.engine source_engine = source_conn.engine
command_for_create = command
if source_conn.db_type == DatabaseTypeEnum.ORACLE.value: if source_conn.db_type == DatabaseTypeEnum.ORACLE.value:
command += " OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY" command_for_create += " OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY"
else: else:
command_words = command.split(" ") command_words = command_for_create.split(" ")
if command_words[-2].lower() != "limit": if command_words[-2].lower() != "limit":
command += " limit 1" command_for_create += " limit 1"
columns = [] columns = []
with engine.connect() as connection: with source_engine.connect() as connection:
result = connection.execute(command) final_command = command_for_create
if final_command.replace(" ", "").lower().find("|select"):
final_command = final_command[final_command.find("select"):]
result = connection.execute(final_command)
fields = result.cursor.description fields = result.cursor.description
for field in fields: for field in fields:
name = field[0] name, type_code, length, presicion = field[0], field[1], field[3], field[5]
type_code = field[1] columns.append((name, type_code, length, presicion))
length = field[3]
column = (name, type_code, length)
columns.append(column)
logger.debug(f"Columnas procesadas: {columns}") logger.debug(f"Columnas procesadas: {columns}")
multiple = select_multiple(command) multiple = select_multiple(command_for_create)
if multiple["is_multiple"]:
pass
else:
model = source_conn.create_model(multiple["tablename"], columns) model = source_conn.create_model(multiple["tablename"], columns)
create = intern_conn.create_table(model) create = intern_conn.create_table(model)
print(create) logger.info(f"Creado correctamente la tabla {multiple['tablename']}. Creado?: {create}")
# Se tiene que calcular el total de filas de la fuente
if command.replace(" ", "").lower().find("|select"):
command = command[command.find("select"):]
steps = get_steps(command, chunksize, source_engine)
# Traemos el iterator
iterator = get_iterator(command, chunksize, source_engine)
logger.info(f"Número de pasos para migrar datos: {steps}")
for step in range(steps):
dataframe = next(iterator)
dataframe["INTERN_ID_BCOM"] = None
logger.debug(dataframe)
save = save_from_dataframe(dataframe, multiple["tablename"], intern_conn.engine)
if save:
logger.info(f"Guardado correctamente dataframe en el paso {step+1}")
def get_select_from_xcom(**kwargs): def get_select_from_xcom(**kwargs):
final_selects = [] final_selects = []
task = kwargs['ti'] task = kwargs['ti']
xcom_keys = task.xcom_pull(task_ids="DAG-BCOM-EXTRACT-SCRIPTS", key="XCOM-EXTRACTION-NAMES") xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys) logger.debug(xcom_keys)
for key in xcom_keys: for key in xcom_keys:
if not key.startswith("SELECT"): if not key.startswith("SELECT"):
continue continue
xcom_selects = task.xcom_pull(task_ids="DAG-BCOM-EXTRACT-SCRIPTS", key=key) xcom_selects = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_selects}") logger.info(f"Trayendo comandos {xcom_selects}")
for select in xcom_selects: for select in xcom_selects:
final_selects.append(select) final_selects.append(select)
Variable.set(key='SELECTS', value=final_selects, serialize_json=True) Variable.set(key='SELECTS', value=final_selects, serialize_json=True)
def get_task_group(db_source_conn, db_intern_conn) -> TaskGroup or None: def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int) -> TaskGroup or None:
group = None group = None
try: try:
with TaskGroup(group_id="ExtraccionDeDatos", prefix_group_id=False) as group: with TaskGroup(group_id="ExtraccionDeDatos", prefix_group_id=False) as group:
init_task = PythonOperator( init_task = PythonOperator(
task_id="MASTER_EXTRACTOR", task_id="MASTER_EXTRACTOR",
python_callable=get_select_from_xcom python_callable=get_select_from_xcom,
trigger_rule='all_success'
) )
selects = Variable.get('SELECTS', default_var=[], deserialize_json=True) selects = Variable.get('SELECTS', default_var=[], deserialize_json=True)
if selects: if selects:
tasks = PythonOperator.partial( tasks = PythonOperator.partial(
task_id="EXTRACTORS", task_id="EXTRACTORS",
python_callable=extract_from_source, python_callable=extract_from_source,
op_kwargs={'source_conn': db_source_conn, 'intern_conn': db_intern_conn} op_kwargs={'source_conn': db_source_conn, 'intern_conn': db_intern_conn, 'chunksize': chunksize}
).expand(op_args=[[item] for item in selects]) ).expand(op_args=[[item] for item in selects])
init_task >> tasks init_task >> tasks
...@@ -76,12 +90,3 @@ def get_task_group(db_source_conn, db_intern_conn) -> TaskGroup or None: ...@@ -76,12 +90,3 @@ def get_task_group(db_source_conn, db_intern_conn) -> TaskGroup or None:
finally: finally:
return group return group
def create_table(db_type: str, tablename: str, fields: List[Tuple[str]]) -> bool:
create = False
try:
pass
except Exception as e:
raise AssertionError(f"Error creando tabla {tablename}. {e}")
finally:
return create
from typing import Any, Dict
import os
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from components.S3Route import save_df_to_s3
from components.Utils import select_multiple, create_temp_file, delete_temp_dir
from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps
import logging
logger = logging.getLogger()
def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timezone: str, chunksize=10000):
engine = intern_conn.engine
tablename = select_multiple(command)["tablename"]
logger.info(f"Generando resultado de la tabla {tablename}")
# Creamos el archivo temporal
filename_mask = params["filename_mask"]
file_type = params["file_type"]
pattern = params["datetime_pattern"]
tmp_file = create_temp_file(filename_mask, file_type, tablename, timezone, pattern)
logger.info(tmp_file)
steps = get_steps(tablename, chunksize, engine, True)
iterator = get_iterator(tablename, chunksize, engine)
logger.info(f"Total de pasos para generar archivo resultado: {steps}")
for step in range(steps):
logger.debug(f"STEP: {step}")
header = True if step == 0 else False
try:
dataframe = next(iterator)
dataframe = dataframe.drop("INTERN_ID_BCOM", axis=1)
logger.debug(dataframe)
dataframe.to_csv(tmp_file, index=False, mode='a', header=header)
except StopIteration:
break
bucket = params["s3_params"]["bucket"]
prefix = params["s3_params"]["prefix"]
conn_id = params["s3_params"]["connection_id"]
if not prefix.endswith("/"):
prefix += "/"
file_key = prefix + tmp_file[tmp_file.rfind("/")+1:]
# Se sube el archivo al S3
logger.info(f"Tamaño del archivo a subir: {os.path.getsize(tmp_file)} bytes")
save_df_to_s3(tmp_file, conn_id, bucket, file_key, in_memory=False)
# Se borra el archivo al finalizar el upload
delete_temp_dir(tmp_file)
def get_generate_from_xcom(**kwargs):
final_outputs = []
task = kwargs['ti']
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith("SELECT"):
continue
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_outputs}")
for select in xcom_outputs:
final_outputs.append(select)
Variable.set(key='GENERATES', value=final_outputs, serialize_json=True)
def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], timezone) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="GeneracionyDespliegueDeResultados", prefix_group_id=False) as group:
init_task = PythonOperator(
task_id="MASTER_GENERATION",
python_callable=get_generate_from_xcom,
trigger_rule='all_success'
)
outputs = Variable.get('GENERATES', default_var=[], deserialize_json=True)
if outputs:
tasks = PythonOperator.partial(
task_id="GENERATORS",
python_callable=generate_and_deploy,
op_kwargs={'intern_conn': db_intern_conn, 'params': parameters, 'timezone': timezone}
).expand(op_args=[[item] for item in outputs])
init_task >> tasks
except Exception as e:
logger.error(f"Error creando taskGroup de generadores. {e}")
finally:
return group
...@@ -8,4 +8,4 @@ class InsumoModel(Base): ...@@ -8,4 +8,4 @@ class InsumoModel(Base):
__abstract__ = True __abstract__ = True
INTERN_UUID_BCOM = Column(Integer, primary_key=True) INTERN_ID_BCOM = Column(Integer, primary_key=True, autoincrement=True)
...@@ -90,21 +90,28 @@ def get_base_date(conn: str, bucket: str, key: str) -> datetime.date: ...@@ -90,21 +90,28 @@ def get_base_date(conn: str, bucket: str, key: str) -> datetime.date:
return last_date return last_date
def save_df_to_s3(df: pd.DataFrame, conn: str, bucket: str, key: str, delimiter: str = ","): def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, delimiter: str = ",",
in_memory: bool = True):
try: try:
logger.info(f"SUBIENDO A NUBE KEY {key}") logger.info(f"SUBIENDO A NUBE KEY {key}")
file_type = get_type_file(key) file_type = get_type_file(key)
s3_hook = S3Hook(conn) s3_hook = S3Hook(conn)
if file_type == FileTypeEnum.EXCEL or file_type == FileTypeEnum.OLD_EXCEL: if file_type == FileTypeEnum.EXCEL or file_type == FileTypeEnum.OLD_EXCEL:
if in_memory:
with BytesIO() as buffer: with BytesIO() as buffer:
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer:
df.to_excel(writer, index=None) data.to_excel(writer, index=None)
s3_hook.load_bytes(buffer.getvalue(), key, bucket, True) s3_hook.load_bytes(buffer.getvalue(), key, bucket, True)
else:
pass
elif file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TEXT: elif file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TEXT:
if in_memory:
csv_buffer = BytesIO() csv_buffer = BytesIO()
df.to_csv(csv_buffer, header=True, index=False, sep=delimiter, na_rep='None') data.to_csv(csv_buffer, header=True, index=False, sep=delimiter, na_rep='None')
csv_buffer.seek(0) csv_buffer.seek(0)
s3_hook.load_bytes(csv_buffer.getvalue(), key, bucket, True) s3_hook.load_bytes(csv_buffer.getvalue(), key, bucket, True)
else:
s3_hook.load_file(data, key, bucket)
except Exception as e: except Exception as e:
logger.error(f"Error guardando archivos a S3. key: {key}. {e}") logger.error(f"Error guardando archivos a S3. key: {key}. {e}")
......
import pytz
from datetime import datetime
from dateutil.parser import parse
import logging
logger = logging.getLogger()
def datetime_by_tzone(tzone: str, pattern: str):
offset = None
# Algunos casos donde el timezone es de la forma 4:30 y no se encuentra en timezones de pytz (GMT)
if ":" in tzone:
offset = tzone.split(":")[1]
tzone = tzone.split(":")[0]
if "+" in tzone:
tzone = tzone.replace(tzone[-1], str(int(tzone[-1]) + 1))
timezones_list = pytz.all_timezones
tzones = [x if tzone in x else None for x in timezones_list]
tzones = list(filter(None, tzones))
server_timezone = pytz.timezone(tzones[0])
logger.debug("Zona Horaria : {}".format(server_timezone))
server_time = server_timezone.localize(datetime.utcnow())
current_time = parse(server_time.strftime('%Y-%m-%d %H:%M:%S.%f %Z'))
if offset:
offset = pytz.FixedOffset((current_time.utcoffset().total_seconds() / 60 + float(offset)) * -1)
offset = offset.utcoffset(datetime.utcnow())
current_time = datetime.utcnow() + offset
else:
current_time = current_time.replace(tzinfo=None) - current_time.utcoffset()
current_time = parse(current_time.strftime(pattern))
logger.debug("Hora actual: {}".format(current_time))
return current_time
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import logging
logger = logging.getLogger()
def transformations(xcom_commands: str, intern_conn):
engine = intern_conn.engine
script_name = xcom_commands[0]
commands = xcom_commands[1]
logger.info(f"Ejecutando transformaciones del script {script_name}")
with engine.connect() as connection:
for command in commands:
logger.debug(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
def get_trans_from_xcom(**kwargs):
transforms_per_file = []
final_transforms = []
task = kwargs['ti']
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
for key in xcom_keys:
if not key.startswith("TRANSFORM"):
continue
xcom_transforms = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_transforms}")
for transform in xcom_transforms:
final_transforms.append(transform)
transforms_per_file.append((key, final_transforms))
Variable.set(key='TRANSFORMS', value=transforms_per_file, serialize_json=True)
def get_transform_task_group(db_intern_conn) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="TransformacionDeDatos", prefix_group_id=False) as group:
init_task = PythonOperator(
task_id="MASTER_TRANSFORMATION",
python_callable=get_trans_from_xcom,
trigger_rule='all_success'
)
transforms = Variable.get('TRANSFORMS', default_var=[], deserialize_json=True)
if transforms:
tasks = PythonOperator.partial(
task_id="TRANSFORMATIONS",
python_callable=transformations,
op_kwargs={'intern_conn': db_intern_conn}
).expand(op_args=[[item] for item in transforms])
init_task >> tasks
except Exception as e:
logger.error(f"Error creando taskGroup de transformación. {e}")
finally:
return group
from typing import List, Any, Dict, Tuple from typing import List, Any, Dict, Tuple
import uuid
import os
import shutil
import pandas as pd import pandas as pd
from enums.CatalogConfigurationEnum import CatalogConfigurationEnum from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from enums.FileTypeEnum import FileTypeEnum from enums.FileTypeEnum import FileTypeEnum
from enums.CommentsScriptEnum import CommentsScriptEnum from enums.CommentsScriptEnum import CommentsScriptEnum
from components.Timezone import datetime_by_tzone
import logging import logging
...@@ -83,50 +86,92 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c ...@@ -83,50 +86,92 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
return data_dict return data_dict
def update_sql_commands(dataset: List[Tuple[str, str]]) -> List[Tuple[str, List[str]]]: def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) -> List[Tuple[str, List[str]]]:
result = [] result = []
comments = [CommentsScriptEnum[item].value for item in CommentsScriptEnum._member_names_] comments = [CommentsScriptEnum[item].value for item in CommentsScriptEnum._member_names_]
try: try:
for row in dataset: for row in dataset:
data = row[1].split("\n") data = row[1].split(";")
data = [item.replace("\r", "").replace(";", "") for item in data] data = [item.replace("\r", "").replace(";", "").replace("\n", " ") for item in data]
data = [item for item in data if item != "" and item[:2] not in comments] add_next = False
result.append((row[0], data)) final_data = []
table_name = ""
for item in data:
final_item = item
if item.lower().strip().find(label_tablename.lower().strip()+":") != -1:
init_index = item.lower().strip().index(label_tablename.lower().strip()+":")
table_name = item.replace(" ", "").strip()[init_index+5:].strip()
add_next = True
elif item != "":
if add_next:
item = table_name + "|" + item
add_next = False
final_item = item.strip()
table_name = ""
if final_item.strip()[:2] in comments and ("update " in final_item.lower() or "delete " in final_item.lower()):
trans_index = final_item.lower().index("update")
final_item = final_item[trans_index:]
final_data.append(final_item)
final_data = [item.replace("\t", "") for item in final_data if item != "" and ("select" in item or
"update" in item or
"delete" in item)]
result.append((row[0], final_data))
logger.info(f"Lista de comandos: {result}")
except Exception as e: except Exception as e:
raise AssertionError(f"Error extrayendo comandos sql. {e}") raise AssertionError(f"Error extrayendo comandos sql. {e}")
finally: finally:
return result return result
def get_selects_from_xcom(task) -> List[str]:
results = []
try:
selects = task.xcom_pull(key=None, task_ids="DAG-BCOM-EXTRACT-SCRIPTS")
print(selects)
except Exception as e:
raise AssertionError(f"Error obteniendo Xcom. {e}")
finally:
return results
def select_multiple(command: str) -> Dict[str, Any]: def select_multiple(command: str) -> Dict[str, Any]:
response = {'is_multiple': False, 'tablename': ''} response = {'is_multiple': False, 'tablename': ''}
tablename = ""
try: try:
if "join" in command.lower(): if command.lower().replace(" ", "").find("|select") != -1:
response["is_multiple"] = True response["is_multiple"] = True
tablename = command[:command.index("|")].strip()
init_index = command.lower().find("from") init_index = command.lower().find("from")
if init_index == -1: if init_index == -1:
raise AssertionError("Query malformed") raise AssertionError("Query malformed")
else: else:
from_command = command[init_index+4:] from_command = command[init_index+4:]
tablename_base = from_command.strip().split(" ") tablename_base = from_command.strip().split(" ")
if len(tablename_base) > 0: if len(tablename_base) > 0 and tablename == "":
tablename = tablename_base[0] tablename = tablename_base[0]
else:
raise AssertionError("Query malformed")
response["tablename"] = tablename response["tablename"] = tablename
except Exception as e: except Exception as e:
raise AssertionError(f"Error validando si es múltiple select y nombre de tabla. {e}") raise AssertionError(f"Error validando si es múltiple select y nombre de tabla. {e}")
finally: finally:
return response return response
def create_temp_file(filename_mask: str, file_type: str, tablename: str, timezone: str, pattern: str) -> str:
""" Create an output result as a file with a mask in the name
"""
fullpath = ""
try:
dir_name = str(uuid.uuid4())
path_dir = "/tmp/" + dir_name
os.mkdir(path_dir)
current_datetime = str(datetime_by_tzone(timezone, pattern))
filename_mask = filename_mask.replace("<source_name>", tablename)
filename_mask = filename_mask.replace("<datetime>", current_datetime)
filename = filename_mask + "." + file_type
fullpath = path_dir + "/" + filename
open(fullpath, mode='a').close()
except Exception as e:
logger.error(f"Error creando directorio y archivo temporal.{e}")
finally:
return fullpath
def delete_temp_dir(module_name: str) -> bool:
drop = False
try:
if os.path.exists(module_name):
directory = os.path.dirname(module_name)
shutil.rmtree(directory, ignore_errors=True)
except Exception as e:
raise AssertionError(f"Error borrando modulo temporal. {e}")
finally:
return drop
from airflow.utils.db import provide_session
from airflow.models import XCom
import logging import logging
from typing import List, Tuple from typing import List, Tuple
logger = logging.getLogger() logger = logging.getLogger()
def save_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_mask: str, transform_mask: str, def save_commands_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_mask: str, transform_mask: str,
order_delimiter: str) -> None: order_delimiter: str) -> None:
final_names_xcom = [] final_names_xcom = []
try: try:
...@@ -27,4 +28,30 @@ def save_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_mask: str, ...@@ -27,4 +28,30 @@ def save_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_mask: str,
final_names_xcom.append(name) final_names_xcom.append(name)
task.xcom_push(key="XCOM-EXTRACTION-NAMES", value=final_names_xcom) task.xcom_push(key="XCOM-EXTRACTION-NAMES", value=final_names_xcom)
except Exception as e: except Exception as e:
raise AssertionError(f"Error guardando variables en xcom. {e}") raise AssertionError(f"Error guardando comandos en variables en xcom. {e}")
def save_names_to_xcom(value: str, task, task_name: str, key: str) -> None:
final_names_xcom = []
try:
xcom_names = task.xcom_pull(task_ids=task_name, key=key)
if not xcom_names:
final_names_xcom.append(value)
else:
final_names_xcom = xcom_names
final_names_xcom.append(value)
task.xcom_push(key=key, value=final_names_xcom)
logger.info(f"Guardado correctamente los nombres de las tablas")
except Exception as e:
logger.error(f"Error guardando nombres en variables xcom. {e}")
def delete_all_xcom_tasks() -> None:
try:
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.task_id == "SCRIPTS-EXTRACTORS").delete()
cleanup_xcom()
except Exception as e:
logger.error(f"Error borrando todas las variables xcom del DAG actual. {e}")
...@@ -8,7 +8,7 @@ app: ...@@ -8,7 +8,7 @@ app:
port: 21521 port: 21521
username: PRUEBABCOM2 username: PRUEBABCOM2
password: admin password: admin
database: airflow database: bd_tp_qa
service: ORCLPDB1 service: ORCLPDB1
schema: public schema: public
transformation: transformation:
...@@ -20,6 +20,8 @@ app: ...@@ -20,6 +20,8 @@ app:
database: prueba_bcom database: prueba_bcom
service: service:
schema: schema:
chunksize: 50000
label_multiple_select: TABLE
source_mask: select source_mask: select
transformation_mask: transform transformation_mask: transform
prefix_order_delimiter: . prefix_order_delimiter: .
...@@ -28,11 +30,13 @@ app: ...@@ -28,11 +30,13 @@ app:
bucket: prueba1234568 bucket: prueba1234568
prefix: bcom_scripts prefix: bcom_scripts
connection_id: conn_script connection_id: conn_script
timezone: 'GMT-5'
outputs: outputs:
filename_mask: <source_name>_<date>_<time> filename_mask: <source_name>_<datetime>
datetime_pattern: '%Y-%m-%d %H:%M:%S'
file_type: txt file_type: txt
s3_params: s3_params:
bucket: ejemplo bucket: prueba1234568
prefix: bcom/inputs prefix: bcom_results
connection_id: conn_result connection_id: conn_script
from datetime import datetime, timedelta from datetime import datetime
import time import time
from typing import Any, Dict
from airflow import DAG from airflow import DAG
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup from airflow.utils.task_group import TaskGroup
from components.Utils import update_sql_commands, get_selects_from_xcom from components.Utils import update_sql_commands
from components.Xcom import save_to_xcom from components.Xcom import save_commands_to_xcom
from components.S3Route import get_files_from_prefix from components.S3Route import get_files_from_prefix
from components.Sensor import create_s3_sensor from components.Sensor import create_s3_sensor
from components.Extractor import get_task_group from components.Extractor import get_extract_task_group
from components.Transformation import get_transform_task_group
from components.Generation import get_generate_task_group
from components.Cleaning import get_cleaning_task_group
from components.Database import Database from components.Databases.Database import Database
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACIONES" DAG_NAME = "BCOM_DAG_TRANSFORMACIONES2"
# TASK_NAMES = {'SENSORS': 'SCRIPTS_SENSORS', 'SCRIPT-EXTRACTION': 'SCRIPTS_EXTRACTOR', 'EXTRACTION': 'ExtraccionDeDatos',
# 'TRANSFORMATION': 'TransformacionDeDatos', ''}
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'BCOM', 'owner': 'BCOM',
"start_date": datetime(2023, 5, 25, 22, 9), "start_date": datetime(2023, 5, 25, 22, 9),
'depends_on_past': False, 'depends_on_past': False,
'email': 'caguirre@bytesw.com', 'email': 'caguirre@bytesw.com',
'retries': 1, 'retries': 0,
'email_on_retry': False, 'email_on_retry': False,
'email_on_failure': False 'email_on_failure': False
} }
def extraction(source_conn, intern_conn) -> TaskGroup: def cleaning(intern_conn) -> TaskGroup:
groups = None groups = None
try: try:
groups = get_task_group(source_conn, intern_conn) groups = get_cleaning_task_group(intern_conn)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
return groups
def generate_and_deploy_results(intern_conn, parameters: Dict[str, Any], timezone: str) -> TaskGroup:
groups = None
try:
groups = get_generate_task_group(intern_conn, parameters, timezone)
except Exception as e:
logger.error(f"Error general de creación y despliegue de resultados. {e}")
finally:
return groups
def transformation(intern_conn) -> TaskGroup:
groups = None
try:
groups = get_transform_task_group(intern_conn)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
return groups
def extraction(source_conn, intern_conn, chunksize: int = 100000) -> TaskGroup:
groups = None
try:
groups = get_extract_task_group(source_conn, intern_conn, chunksize)
except Exception as e: except Exception as e:
logger.error(f"Error general de extracción de datos. {e}") logger.error(f"Error general de extracción de datos. {e}")
finally: finally:
...@@ -39,13 +76,13 @@ def extraction(source_conn, intern_conn) -> TaskGroup: ...@@ -39,13 +76,13 @@ def extraction(source_conn, intern_conn) -> TaskGroup:
def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str, def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str,
order_delimiter: str, **kwargs): order_delimiter: str, label_tablename: str, **kwargs):
try: try:
start_time = time.time() start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}") logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
scripts = get_files_from_prefix(conn_id, bucket, prefix) scripts = get_files_from_prefix(conn_id, bucket, prefix)
scripts = update_sql_commands(scripts) scripts = update_sql_commands(scripts, label_tablename)
save_to_xcom(scripts, kwargs['ti'], source_mask, transform_mask, order_delimiter) save_commands_to_xcom(scripts, kwargs['ti'], source_mask, transform_mask, order_delimiter)
logger.debug(f"Script cargados en Xcom: {scripts}") logger.debug(f"Script cargados en Xcom: {scripts}")
logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos") logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos")
except Exception as e: except Exception as e:
...@@ -72,17 +109,17 @@ def set_dag(): ...@@ -72,17 +109,17 @@ def set_dag():
wildcard_scripts = scripts_s3["prefix"] + "?*" wildcard_scripts = scripts_s3["prefix"] + "?*"
else: else:
wildcard_scripts = scripts_s3["prefix"] + "/?*" wildcard_scripts = scripts_s3["prefix"] + "/?*"
sensor_scripts = create_s3_sensor("sensor_for_scripts", scripts_s3["connection_id"], scripts_s3["bucket"], sensor_scripts = create_s3_sensor("SCRIPTS-SENSOR", scripts_s3["connection_id"], scripts_s3["bucket"],
wildcard_scripts) wildcard_scripts)
extract_mask = conf["source_mask"] extract_mask = conf["source_mask"]
transform_mask = conf["transformation_mask"] transform_mask = conf["transformation_mask"]
order_delimiter = conf["prefix_order_delimiter"] order_delimiter = conf["prefix_order_delimiter"]
script_extractor = PythonOperator( script_extractor = PythonOperator(
task_id="DAG-BCOM-EXTRACT-SCRIPTS", task_id="SCRIPTS-EXTRACTOR",
python_callable=extract_scripts, python_callable=extract_scripts,
op_kwargs={'conn_id': scripts_s3["connection_id"], 'bucket': scripts_s3["bucket"], op_kwargs={'conn_id': scripts_s3["connection_id"], 'bucket': scripts_s3["bucket"],
'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask, 'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask,
'order_delimiter': order_delimiter}, 'order_delimiter': order_delimiter, 'label_tablename': conf["label_multiple_select"]},
trigger_rule="all_success" trigger_rule="all_success"
) )
# Source Database configuration. Only we use 1 source # Source Database configuration. Only we use 1 source
...@@ -99,9 +136,22 @@ def set_dag(): ...@@ -99,9 +136,22 @@ def set_dag():
intern_params["service"], intern_params["schema"]) intern_params["service"], intern_params["schema"])
intern_db.create_engine() intern_db.create_engine()
extractions = extraction(source_db, intern_db) # Creación de grupo de tasks para las extracciones
chunksize = conf["chunksize"]
extractions = extraction(source_db, intern_db, chunksize)
# Creación de grupo de tasks para las transformaciones
transformations = transformation(intern_db)
# Creación de grupo de tasks para la generación y despliegue de archivos resultados
outputs_conf = conf["outputs"]
timezone = conf["timezone"]
result = generate_and_deploy_results(intern_db, outputs_conf, timezone)
# Creación de tasks de limpiadores
cleaners = cleaning(intern_db)
sensor_scripts >> script_extractor >> extractions sensor_scripts >> script_extractor >> extractions >> transformations >> result >> cleaners
return dag return dag
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment