Commit 39a51800 authored by Cristian Aguirre's avatar Cristian Aguirre

Update 02-07-23. Update New Process. Task 3_2 in Progress (Creación de Tabla Dinámica)

parent 4070e361
from typing import List, Tuple
from sqlalchemy import Table, Column, MetaData
from components.Model.InsumoModel import InsumoModel
from components.Databases.Mysql import Mysql
from components.Databases.Mariadb import Mariadb
from components.Databases.Postgres import Postgres
from components.Databases.Oracle import Oracle
from enums.DatabaseTypeEnum import DatabaseTypeEnum
import logging
import sys
import oracledb
oracledb.version = "8.3.0"
sys.modules["cx_Oracle"] = oracledb
logger = logging.getLogger()
metadata_obj = MetaData()
class Database:
def __init__(self, db_type: str, host: str, port: int, user: str, password: str, database: str,
service: str, schema: str) -> None:
self.db_type = db_type
if db_type == DatabaseTypeEnum.MYSQL.value:
self.factory = Mysql(host, port, user, password, database)
elif db_type == DatabaseTypeEnum.MARIADB.value:
self.factory = Mariadb(host, port, user, password, database)
elif db_type == DatabaseTypeEnum.POSTGRES.value:
self.factory = Postgres(host, port, user, password, database, schema)
elif db_type == DatabaseTypeEnum.ORACLE.value:
self.factory = Oracle(host, port, user, password, service)
self.engine = None
def create_engine(self) -> None:
try:
if isinstance(self.engine, type(None)):
self.factory.create_engine()
self.engine = self.factory.engine
except Exception as e:
logger.error(f"Error creando db engine. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel") -> bool:
create = False
try:
create = self.factory.create_model(tablename, fields, modelName)
except Exception as e:
raise AssertionError(f"Error creando tabla dinámica con nombre {tablename}. {e}")
finally:
return create
def create_table(self, model) -> bool:
save = False
try:
model.metadata.create_all(self.engine)
save = True
except Exception as e:
logger.error(f"Error creando tabla {model}. {e}")
finally:
return save
from enum import Enum
class MysqlDataTypeEnum(Enum):
VARCHAR = 253
TINY_INT = 1
INT = 3
TEXT = 252
DECIMAL = 0
SHORT = 2
TIMESTAMP = 7
JSON = 245
BIGINT = 8
FLOAT = 4
DATETIME = 12
DATE = 10
TIME = 11
DOUBLE = 5
from enum import Enum
from sqlalchemy import Integer, String, Float, Date, TIMESTAMP, Numeric, CHAR, Text, Boolean, BLOB, JSON, BigInteger, \
DateTime, Time, DECIMAL
class MysqlDataTypeORMEnum(Enum):
VARCHAR = String
TINY_INT = Integer
INT = Integer
TEXT = Text
DECIMAL = DECIMAL
SHORT = String
TIMESTAMP = TIMESTAMP
JSON = JSON
BIGINT = BigInteger
FLOAT = Float
DATETIME = DateTime
DATE = Date
TIME = Time
DOUBLE = Float
from enum import Enum
class OracleDataTypeEnum(Enum):
NUMBER = "DB_TYPE_NUMBER"
VARCHAR2 = "DB_TYPE_VARCHAR"
DATE = "DB_TYPE_DATE"
from enum import Enum
from sqlalchemy import Integer, String, Float, Date, TIMESTAMP, Numeric, CHAR, Text, Boolean, BLOB
class OracleDataTypeORMEnum(Enum):
NUMBER = Integer
VARCHAR2 = String
DATE = Date
from enum import Enum
class PostgresDataTypeEnum(Enum):
VARCHAR = 1043
INTEGER = 23
NUMERIC = 1700
TIMESTAMP = 1114
TEXT = 25
BOOLEAN = 16
BYTEA = 17
CHAR = 18
FLOAT = 701
DATE = 1082
INTEGER8 = 20
from enum import Enum
from sqlalchemy import Integer, String, Float, Date, TIMESTAMP, Numeric, CHAR, Text, Boolean, BLOB
class PostgresDataTypeORMEnum(Enum):
VARCHAR = String
INTEGER = Integer
NUMERIC = Numeric
TIMESTAMP = TIMESTAMP
TEXT = Text
BOOLEAN = Boolean
BYTEA = BLOB
CHAR = CHAR
FLOAT = Float
DATE = Date
INTEGER8 = Integer
from sqlalchemy import create_engine
from enums.DatabaseDialectEnum import DatabaseDialectEnum
import logging
logger = logging.getLogger()
class Mariadb:
def __init__(self, host: str, port: int, user: str, password: str, database: str) -> None:
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.engine = None
def create_engine(self) -> None:
try:
dialect = DatabaseDialectEnum.MARIADB.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4"
self.engine = create_engine(url)
except Exception as e:
logger.error(f"Error creando engine de Mariadb. {e}")
from typing import List, Tuple
from sqlalchemy import create_engine
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from components.Model.InsumoModel import InsumoModel
from components.Databases.Enums.MysqlDataTypeEnum import MysqlDataTypeEnum
from components.Databases.Enums.MysqlDataTypeORMEnum import MysqlDataTypeORMEnum
from sqlalchemy import Table, Column, MetaData
import logging
logger = logging.getLogger()
class Mysql:
def __init__(self, host: str, port: int, user: str, password: str, database: str) -> None:
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.engine = None
def create_engine(self) -> None:
try:
dialect = DatabaseDialectEnum.MYSQL.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4"
self.engine = create_engine(url)
except Exception as e:
logger.error(f"Error creando engine de Mysql. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel") -> bool:
model = None
try:
model = type(modelName, (InsumoModel,), {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
for field in fields:
logger.info(f"Attribute: {field}")
name = field[0]
if field[2] != -1:
size = int(field[2] / 4)
try:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value(size)
except TypeError:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value()
else:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value()
setattr(model, name, Column(data_type))
model = model.__table__
except Exception as e:
logger.error(f"Error creando modelo dinámico en Mysql con nombre {tablename}. {e}")
finally:
return model
from typing import List, Tuple
from sqlalchemy import create_engine
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from components.Databases.Enums.OracleDataTypeEnum import OracleDataTypeEnum
from components.Databases.Enums.OracleDataTypeORMEnum import OracleDataTypeORMEnum
from components.Model.InsumoModel import InsumoModel
from sqlalchemy import Column
import logging
logger = logging.getLogger()
class Oracle:
def __init__(self, host: str, port: int, user: str, password: str, service: str) -> None:
self.host = host
self.port = port
self.user = user
self.password = password
self.service = service
self.engine = None
def create_engine(self) -> None:
try:
dialect = DatabaseDialectEnum.ORACLE.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}?service_name={self.service}"
self.engine = create_engine(url)
except Exception as e:
logger.error(f"Error creando engine de Oracle. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel") -> bool:
model = None
try:
model = type(modelName, (InsumoModel,), {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
for field in fields:
logger.info(f"Attribute: {field}")
name = field[0]
if not isinstance(field[2], type(None)):
size = int(field[2] / 4)
try:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value(size)
except TypeError:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value()
else:
data_type = OracleDataTypeORMEnum[OracleDataTypeEnum(field[1].name).name].value()
setattr(model, name, Column(data_type))
model = model.__table__
except Exception as e:
logger.error(f"Error creando modelo dinámico en Oracle con nombre {tablename}. {e}")
finally:
return model
from typing import List, Tuple
from sqlalchemy import create_engine
from components.Model.InsumoModel import InsumoModel
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from components.Databases.Enums.PostgresDataTypeEnum import PostgresDataTypeEnum
from components.Databases.Enums.PostgresDataTypeORMEnum import PostgresDataTypeORMEnum
import logging
from sqlalchemy import Table, Column, MetaData
logger = logging.getLogger()
class Postgres:
def __init__(self, host: str, port: int, user: str, password: str, database: str, schema: str) -> None:
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.schema = schema
self.engine = None
def create_engine(self) -> None:
try:
dialect = DatabaseDialectEnum.POSTGRES.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}" \
f"?options=-csearch_path={self.schema}"
self.engine = create_engine(url)
except Exception as e:
logger.error(f"Error creando engine de Postgres. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel"):
model = None
try:
model = type(modelName, (InsumoModel,), {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
for field in fields:
logger.info(f"Attribute: {field}")
name = field[0]
if field[2] != -1:
try:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(field[2])
except TypeError:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value()
else:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value()
setattr(model, name, Column(data_type))
model = model.__table__
except Exception as e:
logger.error(f"Error creando modelo dinámico en Postgres con nombre {tablename}. {type(e)}. {e}")
finally:
return model
from typing import List, Tuple
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.Utils import select_multiple
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import logging
logger = logging.getLogger()
def extract_from_source(command: str, source_conn, intern_conn):
engine = source_conn.engine
if source_conn.db_type == DatabaseTypeEnum.ORACLE.value:
command += " OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY"
else:
command_words = command.split(" ")
if command_words[-2].lower() != "limit":
command += " limit 1"
columns = []
with engine.connect() as connection:
result = connection.execute(command)
fields = result.cursor.description
for field in fields:
name = field[0]
type_code = field[1]
length = field[3]
column = (name, type_code, length)
columns.append(column)
logger.debug(f"Columnas procesadas: {columns}")
multiple = select_multiple(command)
if multiple["is_multiple"]:
pass
else:
model = source_conn.create_model(multiple["tablename"], columns)
create = intern_conn.create_table(model)
print(create)
def get_select_from_xcom(**kwargs):
final_selects = []
task = kwargs['ti']
xcom_keys = task.xcom_pull(task_ids="DAG-BCOM-EXTRACT-SCRIPTS", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith("SELECT"):
continue
xcom_selects = task.xcom_pull(task_ids="DAG-BCOM-EXTRACT-SCRIPTS", key=key)
logger.info(f"Trayendo comandos {xcom_selects}")
for select in xcom_selects:
final_selects.append(select)
Variable.set(key='SELECTS', value=final_selects, serialize_json=True)
def get_task_group(db_source_conn, db_intern_conn) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="ExtraccionDeDatos", prefix_group_id=False) as group:
init_task = PythonOperator(
task_id="MASTER_EXTRACTOR",
python_callable=get_select_from_xcom
)
selects = Variable.get('SELECTS', default_var=[], deserialize_json=True)
if selects:
tasks = PythonOperator.partial(
task_id="EXTRACTORS",
python_callable=extract_from_source,
op_kwargs={'source_conn': db_source_conn, 'intern_conn': db_intern_conn}
).expand(op_args=[[item] for item in selects])
init_task >> tasks
except Exception as e:
logger.error(f"Error creando taskGroup de extracción. {e}")
finally:
return group
def create_table(db_type: str, tablename: str, fields: List[Tuple[str]]) -> bool:
create = False
try:
pass
except Exception as e:
raise AssertionError(f"Error creando tabla {tablename}. {e}")
finally:
return create
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer
Base = declarative_base()
class InsumoModel(Base):
__abstract__ = True
INTERN_UUID_BCOM = Column(Integer, primary_key=True)
import fnmatch import fnmatch
import datetime import datetime
from typing import Any, Dict from typing import Any, Dict, List, Tuple
import pytz import pytz
from io import BytesIO, StringIO from io import BytesIO, StringIO
...@@ -8,6 +8,7 @@ import pandas as pd ...@@ -8,6 +8,7 @@ import pandas as pd
from components.Utils import get_type_file from components.Utils import get_type_file
from enums.FileTypeEnum import FileTypeEnum from enums.FileTypeEnum import FileTypeEnum
from enums.ScriptFileTypeEnum import ScriptFileTypeEnum
from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import logging import logging
...@@ -42,7 +43,6 @@ def get_df_from_s3(conn: str, bucket: str, key: str, delimiter: str, base_date: ...@@ -42,7 +43,6 @@ def get_df_from_s3(conn: str, bucket: str, key: str, delimiter: str, base_date:
def get_data_from_s3(conn: str, bucket: str, key: str, base_date: datetime.date, interval: str) -> Dict[str, Any]: def get_data_from_s3(conn: str, bucket: str, key: str, base_date: datetime.date, interval: str) -> Dict[str, Any]:
result = {'filename': '', 'data': BytesIO()} result = {'filename': '', 'data': BytesIO()}
utc = pytz.UTC
try: try:
if key.rfind("/") != -1: if key.rfind("/") != -1:
prefix = key[:key.rfind("/")+1] prefix = key[:key.rfind("/")+1]
...@@ -109,7 +109,7 @@ def save_df_to_s3(df: pd.DataFrame, conn: str, bucket: str, key: str, delimiter: ...@@ -109,7 +109,7 @@ def save_df_to_s3(df: pd.DataFrame, conn: str, bucket: str, key: str, delimiter:
logger.error(f"Error guardando archivos a S3. key: {key}. {e}") logger.error(f"Error guardando archivos a S3. key: {key}. {e}")
def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str): def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str) -> None:
try: try:
filename = source_key[source_key.rfind("/")+1:] filename = source_key[source_key.rfind("/")+1:]
output_key += filename output_key += filename
...@@ -118,3 +118,26 @@ def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str): ...@@ -118,3 +118,26 @@ def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str):
s3_hook.delete_objects(bucket, source_key) s3_hook.delete_objects(bucket, source_key)
except Exception as e: except Exception as e:
logger.error(f"Error moviendo archivo desde {source_key} hacia {output_key} en bucket {bucket}. {e}") logger.error(f"Error moviendo archivo desde {source_key} hacia {output_key} en bucket {bucket}. {e}")
def get_files_from_prefix(conn: str, bucket: str, prefix: str) -> List[Tuple[str, str]]:
result = []
allowed_filetypes = [ScriptFileTypeEnum[item].value for item in ScriptFileTypeEnum._member_names_]
try:
s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix)
logger.debug(f"Archivos encontrados en el prefijo {prefix}: {files}")
for file in files:
if file.endswith("/") or file[file.rfind(".")+1:].lower() not in allowed_filetypes:
continue
data = s3_hook.get_key(file, bucket).get()['Body'].read().decode("utf-8")
if file.find("/") == -1:
filename = file
else:
filename = file[file.rfind("/")+1:]
script = (filename, data)
result.append(script)
except Exception as e:
logger.error(f"Error extrayendo archivos en memoria desde bucket {bucket} y prefix {prefix}. {e}")
finally:
return result
...@@ -7,6 +7,7 @@ logger = logging.getLogger() ...@@ -7,6 +7,7 @@ logger = logging.getLogger()
POKE_INTERVAL = 5 POKE_INTERVAL = 5
TIMEOUT = 60*1 TIMEOUT = 60*1
VERIFY_SSL = False
def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3KeySensor: def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3KeySensor:
...@@ -18,7 +19,7 @@ def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3 ...@@ -18,7 +19,7 @@ def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3
bucket_name=bucket, bucket_name=bucket,
wildcard_match=True, wildcard_match=True,
aws_conn_id=connection, aws_conn_id=connection,
verify=True, verify=VERIFY_SSL,
poke_interval=POKE_INTERVAL, poke_interval=POKE_INTERVAL,
timeout=TIMEOUT timeout=TIMEOUT
) )
......
from typing import List, Any, Dict from typing import List, Any, Dict, Tuple
import pandas as pd import pandas as pd
from enums.CatalogConfigurationEnum import CatalogConfigurationEnum from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from enums.FileTypeEnum import FileTypeEnum from enums.FileTypeEnum import FileTypeEnum
from enums.CommentsScriptEnum import CommentsScriptEnum
import logging import logging
...@@ -77,6 +78,55 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c ...@@ -77,6 +78,55 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
if "delimiter" in catalog.keys(): if "delimiter" in catalog.keys():
data_dict.update({catalog_name+'_delimiter': catalog["delimiter"]}) data_dict.update({catalog_name+'_delimiter': catalog["delimiter"]})
except Exception as e: except Exception as e:
logger.error(f"Error actualizando dict de catalogos. {e}") raise AssertionError(f"Error actualizando dict de catalogos. {e}")
finally: finally:
return data_dict return data_dict
def update_sql_commands(dataset: List[Tuple[str, str]]) -> List[Tuple[str, List[str]]]:
result = []
comments = [CommentsScriptEnum[item].value for item in CommentsScriptEnum._member_names_]
try:
for row in dataset:
data = row[1].split("\n")
data = [item.replace("\r", "").replace(";", "") for item in data]
data = [item for item in data if item != "" and item[:2] not in comments]
result.append((row[0], data))
except Exception as e:
raise AssertionError(f"Error extrayendo comandos sql. {e}")
finally:
return result
def get_selects_from_xcom(task) -> List[str]:
results = []
try:
selects = task.xcom_pull(key=None, task_ids="DAG-BCOM-EXTRACT-SCRIPTS")
print(selects)
except Exception as e:
raise AssertionError(f"Error obteniendo Xcom. {e}")
finally:
return results
def select_multiple(command: str) -> Dict[str, Any]:
response = {'is_multiple': False, 'tablename': ''}
try:
if "join" in command.lower():
response["is_multiple"] = True
init_index = command.lower().find("from")
if init_index == -1:
raise AssertionError("Query malformed")
else:
from_command = command[init_index+4:]
tablename_base = from_command.strip().split(" ")
if len(tablename_base) > 0:
tablename = tablename_base[0]
else:
raise AssertionError("Query malformed")
response["tablename"] = tablename
except Exception as e:
raise AssertionError(f"Error validando si es múltiple select y nombre de tabla. {e}")
finally:
return response
import logging
from typing import List, Tuple
logger = logging.getLogger()
def save_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_mask: str, transform_mask: str,
order_delimiter: str) -> None:
final_names_xcom = []
try:
for data in dataset:
logger.info(f"Guardando Xcom en llave {data[0]}")
name = data[0]
base_name = name
if order_delimiter == ".":
base_name = base_name[:base_name.rfind(".")]
order = base_name.split(order_delimiter)
if len(order) < 2:
raise AssertionError(f"Script {name} no tiene prefijo de orden. Validar nombre de script")
name += "_" + order[0]
if name.find(extract_mask) != -1:
name = "SELECT|" + name
elif name.find(transform_mask) != -1:
name = "TRANSFORM|" + name
task.xcom_push(key=name, value=data[1])
final_names_xcom.append(name)
task.xcom_push(key="XCOM-EXTRACTION-NAMES", value=final_names_xcom)
except Exception as e:
raise AssertionError(f"Error guardando variables en xcom. {e}")
app:
schedule: "@once"
database:
sources:
source1:
type: oracle
host: 192.168.27.22
port: 21521
username: PRUEBABCOM2
password: admin
database: airflow
service: ORCLPDB1
schema: public
transformation:
type: mysql
host: 192.168.1.10
port: 13306
username: root
password: root
database: prueba_bcom
service:
schema:
source_mask: select
transformation_mask: transform
prefix_order_delimiter: .
scripts:
s3_params:
bucket: prueba1234568
prefix: bcom_scripts
connection_id: conn_script
outputs:
filename_mask: <source_name>_<date>_<time>
file_type: txt
s3_params:
bucket: ejemplo
prefix: bcom/inputs
connection_id: conn_result
from datetime import datetime, timedelta
import time
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from components.Utils import update_sql_commands, get_selects_from_xcom
from components.Xcom import save_to_xcom
from components.S3Route import get_files_from_prefix
from components.Sensor import create_s3_sensor
from components.Extractor import get_task_group
from components.Database import Database
import logging
logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACIONES"
DEFAULT_ARGS = {
'owner': 'BCOM',
"start_date": datetime(2023, 5, 25, 22, 9),
'depends_on_past': False,
'email': 'caguirre@bytesw.com',
'retries': 1,
'email_on_retry': False,
'email_on_failure': False
}
def extraction(source_conn, intern_conn) -> TaskGroup:
groups = None
try:
groups = get_task_group(source_conn, intern_conn)
except Exception as e:
logger.error(f"Error general de extracción de datos. {e}")
finally:
return groups
def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str,
order_delimiter: str, **kwargs):
try:
start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
scripts = get_files_from_prefix(conn_id, bucket, prefix)
scripts = update_sql_commands(scripts)
save_to_xcom(scripts, kwargs['ti'], source_mask, transform_mask, order_delimiter)
logger.debug(f"Script cargados en Xcom: {scripts}")
logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos")
except Exception as e:
raise AssertionError(f"Error general de descarga y guardado de scripts en variables Xcom. {e}")
def set_dag():
""" DAG that execute a series of SQL commands from scripts to make a result file and save on a bucket (for now)"""
import yaml
from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/root/airflow/dags/dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
print(data)
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que extrae y transforma",
schedule_interval=conf["schedule"], tags=["DAG BCOM - SQL TRANSFORMATIONS"], catchup=True) as dag:
scripts_s3 = conf["scripts"]["s3_params"]
if scripts_s3["prefix"].endswith("/"):
wildcard_scripts = scripts_s3["prefix"] + "?*"
else:
wildcard_scripts = scripts_s3["prefix"] + "/?*"
sensor_scripts = create_s3_sensor("sensor_for_scripts", scripts_s3["connection_id"], scripts_s3["bucket"],
wildcard_scripts)
extract_mask = conf["source_mask"]
transform_mask = conf["transformation_mask"]
order_delimiter = conf["prefix_order_delimiter"]
script_extractor = PythonOperator(
task_id="DAG-BCOM-EXTRACT-SCRIPTS",
python_callable=extract_scripts,
op_kwargs={'conn_id': scripts_s3["connection_id"], 'bucket': scripts_s3["bucket"],
'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask,
'order_delimiter': order_delimiter},
trigger_rule="all_success"
)
# Source Database configuration. Only we use 1 source
source_params = conf["database"]["sources"]["source1"]
source_db = Database(source_params["type"], source_params["host"], int(source_params["port"]),
source_params["username"], source_params["password"], source_params["database"],
source_params["service"], source_params["schema"])
source_db.create_engine()
# Intern Database configuration.
intern_params = conf["database"]["transformation"]
intern_db = Database(intern_params["type"], intern_params["host"], int(intern_params["port"]),
intern_params["username"], intern_params["password"], intern_params["database"],
intern_params["service"], intern_params["schema"])
intern_db.create_engine()
extractions = extraction(source_db, intern_db)
sensor_scripts >> script_extractor >> extractions
return dag
globals()["0"] = set_dag()
...@@ -7,8 +7,7 @@ import numpy as np ...@@ -7,8 +7,7 @@ import numpy as np
from components.S3Route import get_df_from_s3, get_base_date, save_df_to_s3, move_object_s3 from components.S3Route import get_df_from_s3, get_base_date, save_df_to_s3, move_object_s3
from components.Sensor import create_s3_sensor from components.Sensor import create_s3_sensor
from components.Utils import get_modified_prefix, add_period_to_sufix, remove_invalid_rows, remove_fields, \ from components.Utils import get_modified_prefix, remove_invalid_rows, remove_fields, update_dict_with_catalogs
update_dict_with_catalogs
from airflow import DAG from airflow import DAG
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
...@@ -25,7 +24,7 @@ PROMOCION_DEFAULT = "Promocion" ...@@ -25,7 +24,7 @@ PROMOCION_DEFAULT = "Promocion"
PROMOCIONES_NO_CONSIDERADAS_TV_CANALES = "Adicional|Soundbox|SOUNDBOX" PROMOCIONES_NO_CONSIDERADAS_TV_CANALES = "Adicional|Soundbox|SOUNDBOX"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'airflow', 'owner': 'BCOM',
"start_date": datetime(2023, 5, 25, 22, 9), "start_date": datetime(2023, 5, 25, 22, 9),
'depends_on_past': False, 'depends_on_past': False,
'email': 'caguirre@bytesw.com', 'email': 'caguirre@bytesw.com',
...@@ -229,13 +228,13 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D ...@@ -229,13 +228,13 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D
def set_dag_1(): def set_dag_1():
""" DAG that execute the process with TACOMVENTAS AND PROMOCIONES_RESIDENCIAL""" """ DAG that execute the process with TACOMVENTAS AND PROMOCIONES_RESIDENCIAL inputs"""
import yaml import yaml
from yaml.loader import SafeLoader from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml # Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags # En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/opt/airflow/dags/app_conf.yml" conf_path = "/root/airflow/dags/app_conf.yml"
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader) data = yaml.load(f, Loader=SafeLoader)
general_cnf = data["general"] general_cnf = data["general"]
......
from enum import Enum
class CommentsScriptEnum(Enum):
DASHES = "--"
NUMERAL = "#"
from enum import Enum
class DatabaseDialectEnum(Enum):
MYSQL = "mysql+pymysql"
MARIADB = "mysql+pymysql"
ORACLE = "oracle"
POSTGRES = "postgresql+psycopg2"
from enum import Enum
class DatabaseTypeEnum(Enum):
MYSQL = "mysql"
MARIADB = "mariadb"
ORACLE = "oracle"
POSTGRES = "postgres"
from enum import Enum
class ScriptFileTypeEnum(Enum):
TEXT = "txt"
SQL = "sql"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment