Commit 55785c2d authored by Cristian Aguirre's avatar Cristian Aguirre

Update 21-07-23. Update DAG process. Version-1

parent 88948dcc
......@@ -3,10 +3,13 @@ import json
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from components.Utils import select_multiple
from components.Xcom import delete_all_xcom_tasks
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.S3Route import load_obj_to_s3
from enums.OperationTypeEnum import OperationTypeEnum
import logging
......@@ -14,6 +17,7 @@ logger = logging.getLogger()
def validate_clean(control_params: Dict[str, Any], **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG")
conn = control_params["connection_id"]
......@@ -26,6 +30,7 @@ def validate_clean(control_params: Dict[str, Any], **kwargs) -> None:
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
def clean(command: str, intern_conn):
......@@ -35,50 +40,46 @@ def clean(command: str, intern_conn):
delete = delete_table(tablename, engine)
if delete:
logger.info(f"Borrado correctamente la tabla {tablename}")
delete_all_xcom_tasks()
logger.info(f"Borrado todas las variables xcom")
def get_generate_from_xcom(**kwargs):
@task(task_id="MASTER_CLEANING", trigger_rule='none_skipped')
def get_cleaners_from_xcom(**kwargs):
final_selects = []
task = kwargs['ti']
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith("SELECT"):
if not key.startswith(OperationTypeEnum.SELECT.value) and not key.startswith(OperationTypeEnum.PROCEDURE.value):
continue
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_outputs}")
for select in xcom_outputs:
final_selects.append(select)
logger.info(f"Final selects: {final_selects}")
Variable.set(key='CLEANS', value=final_selects, serialize_json=True)
return [[item] for item in final_selects]
def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any]) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="LimpiezaDelProceso", prefix_group_id=False) as group:
init_task = PythonOperator(
task_id="MASTER_CLEANING",
python_callable=get_generate_from_xcom,
trigger_rule='all_success'
)
cleaners = Variable.get('CLEANS', default_var=[], deserialize_json=True)
if cleaners:
cleaners = get_cleaners_from_xcom()
tasks = PythonOperator.partial(
task_id="CLEANERS",
python_callable=clean,
op_kwargs={'intern_conn': db_intern_conn}
).expand(op_args=[[item] for item in cleaners])
).expand(op_args=cleaners)
validate_task = PythonOperator(
task_id="VALIDATE_CLEANER",
python_callable=validate_clean,
op_kwargs={'control_params': control_s3},
trigger_rule='all_done'
trigger_rule='none_skipped'
)
init_task >> tasks >> validate_task
cleaners >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de limpiadores. {e}")
finally:
......
......@@ -16,9 +16,10 @@ def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool =
if result:
total_rows = int(result[0])
logger.info(f"Total de filas: {total_rows}")
steps = int(total_rows/chunksize)
if steps >= final_steps:
final_steps = steps + 1
if total_rows == chunksize:
final_steps = 1
else:
final_steps = int(total_rows/chunksize) + 1
except Exception as e:
logger.error(f"Error calculando el total de N° de filas desde el comando: {sql_command}. {e}")
finally:
......
......@@ -31,6 +31,22 @@ class Database:
self.factory = Oracle(host, port, user, password, service)
self.engine = None
def get_basic_connection(self):
connection = None
try:
connection = self.factory.get_basic_connection()
except Exception as e:
logger.error(f"Error trayendo básica conexión. {e}")
finally:
return connection
def close_basic_connection(self) -> None:
try:
self.factory.connection.close()
self.factory.connection = None
except Exception as e:
logger.error(f"Error cerrando básica conexión. {e}")
def create_engine(self) -> None:
try:
if isinstance(self.engine, type(None)):
......
from typing import List, Tuple
from sqlalchemy import create_engine
import oracledb
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from components.Databases.Enums.OracleDataTypeEnum import OracleDataTypeEnum
......@@ -20,13 +21,25 @@ class Oracle:
self.user = user
self.password = password
self.service = service
self.arraysize = 1000
self.engine = None
self.connection = None
def get_basic_connection(self):
try:
if isinstance(self.connection, type(None)):
self.connection = oracledb.connect(user=self.user, password=self.password, host=self.host,
port=self.port, service_name=self.service)
except Exception as e:
logger.error(f"Error obteniendo conexion básica de Oracle. {e}")
finally:
return self.connection
def create_engine(self) -> None:
try:
dialect = DatabaseDialectEnum.ORACLE.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}?service_name={self.service}"
self.engine = create_engine(url)
self.engine = create_engine(url, arraysize=self.arraysize)
except Exception as e:
logger.error(f"Error creando engine de Oracle. {e}")
......
......@@ -22,6 +22,7 @@ class Postgres:
self.database = database
self.schema = schema
self.engine = None
self.DEFAULT_VAR_LENGHT = 100
def create_engine(self) -> None:
try:
......@@ -51,7 +52,7 @@ class Postgres:
except TypeError:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value()
else:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value()
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(self.DEFAULT_VAR_LENGHT)
setattr(model, name, Column(data_type))
model = model.__table__
except Exception as e:
......
This diff is collapsed.
......@@ -4,13 +4,16 @@ import json
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.S3Route import save_df_to_s3, load_obj_to_s3
from components.Utils import select_multiple, create_temp_file, delete_temp_dir
from components.Control import get_tasks_from_control, update_new_process
from components.Xcom import delete_all_xcom_tasks
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps
from enums.OperationTypeEnum import OperationTypeEnum
import logging
......@@ -18,6 +21,7 @@ logger = logging.getLogger()
def validate_generate(control_params: Dict[str, Any], timezone: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="GENERATORS", key="FAILED_TASKS")
......@@ -46,7 +50,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, **kwargs) -
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
raise AssertionError(f"Ocurrieron errores en la etapa de generación")
raise AirflowSkipException(f"Ocurrieron errores en la etapa de generación")
elif status == ProcessStatusEnum.SUCCESS.value:
ti.xcom_push(key="CONTROL-CONFIG", value=conf)
......@@ -76,9 +80,9 @@ def on_success_generator(context) -> None:
def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timezone: str, chunksize=10000):
engine = intern_conn.engine
logger.debug(f"COMANDO: {command}")
tablename = select_multiple(command)["tablename"]
logger.info(f"Generando resultado de la tabla {tablename}")
# Creamos el archivo temporal
filename_mask = params["filename_mask"]
file_type = params["file_type"]
......@@ -95,7 +99,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
header = True if step == 0 else False
try:
dataframe = next(iterator)
dataframe = dataframe.drop("INTERN_ID_BCOM", axis=1)
dataframe = dataframe.drop("INTERN_ID_BCOM", axis=1, errors='ignore')
logger.debug(dataframe)
dataframe.to_csv(tmp_file, sep=delimiter, index=False, mode='a', header=header)
except StopIteration:
......@@ -115,6 +119,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
delete_temp_dir(tmp_file)
@task(task_id="MASTER_GENERATION", trigger_rule='none_skipped')
def get_generate_from_xcom(**kwargs):
task = kwargs['ti']
final_outputs = []
......@@ -126,14 +131,16 @@ def get_generate_from_xcom(**kwargs):
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith("SELECT"):
if not key.startswith(OperationTypeEnum.SELECT.value) and not key.startswith(OperationTypeEnum.PROCEDURE.value):
continue
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo tablas {xcom_outputs}")
for select in xcom_outputs:
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or select not in success_tasks:
final_outputs.append(select)
logger.info(f"Final outputs: {final_outputs}")
Variable.set(key='GENERATES', value=final_outputs, serialize_json=True)
return [[item] for item in final_outputs]
def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_s3: Dict[str, Any],
......@@ -141,28 +148,23 @@ def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_
group = None
try:
with TaskGroup(group_id="GeneracionyDespliegueDeResultados", prefix_group_id=False) as group:
init_task = PythonOperator(
task_id="MASTER_GENERATION",
python_callable=get_generate_from_xcom,
trigger_rule='all_success'
)
outputs = Variable.get('GENERATES', default_var=[], deserialize_json=True)
if outputs:
outputs = get_generate_from_xcom()
tasks = PythonOperator.partial(
task_id="GENERATORS",
python_callable=generate_and_deploy,
on_failure_callback=on_failure_generator,
on_success_callback=on_success_generator,
op_kwargs={'intern_conn': db_intern_conn, 'params': parameters, 'timezone': timezone}
).expand(op_args=[[item] for item in outputs])
).expand(op_args=outputs)
validate_task = PythonOperator(
task_id="VALIDATE_GENERATOR",
python_callable=validate_generate,
op_kwargs={'control_params': control_s3, 'timezone': timezone},
trigger_rule='all_done'
trigger_rule='none_skipped'
)
init_task >> tasks >> validate_task
outputs >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de generadores. {e}")
finally:
......
......@@ -157,6 +157,7 @@ def get_file_from_key(conn: str, bucket: str, key: str) -> Any:
data = s3_hook.get_key(key, bucket)
data.download_fileobj(result)
except Exception as e:
result = None
logger.error(f"Error extrayendo archivo {key}. {e}")
finally:
return result
......
import json
from typing import Dict, Any
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_obj_to_s3
from components.Xcom import delete_all_xcom_tasks
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from enums.ProcessStatusEnum import ProcessStatusEnum
from enums.OperationTypeEnum import OperationTypeEnum
import logging
......@@ -16,6 +18,7 @@ logger = logging.getLogger()
def validate_transform(control_params: Dict[str, Any], timezone: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="FAILED_TASKS")
......@@ -44,7 +47,7 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, **kwargs)
if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks()
raise AssertionError(f"Ocurrieron errores en la etapa de transformación")
raise AirflowSkipException(f"Ocurrieron errores en la etapa de transformación")
elif status == ProcessStatusEnum.SUCCESS.value:
ti.xcom_push(key="CONTROL-CONFIG", value=conf)
......@@ -79,14 +82,14 @@ def transformations(xcom_commands: str, intern_conn):
logger.info(f"Ejecutando transformaciones del script {script_name}")
with engine.connect() as connection:
for command in commands:
logger.info(f"Ejecutando comando de transformación: {command}")
logger.debug(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
@task(task_id="MASTER_TRANSFORMATION", trigger_rule='none_skipped')
def get_trans_from_xcom(**kwargs):
task = kwargs['ti']
transforms_per_file = []
final_transforms = []
conf = task.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
tasks = get_tasks_from_control(conf, "transformation")
success_tasks = tasks["tasks"]
......@@ -95,7 +98,8 @@ def get_trans_from_xcom(**kwargs):
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
if not key.startswith("TRANSFORM"):
final_transforms = []
if not key.startswith(OperationTypeEnum.TRANSFORM.value):
continue
xcom_transforms = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo comandos {xcom_transforms}")
......@@ -105,26 +109,22 @@ def get_trans_from_xcom(**kwargs):
transforms_per_file.append((key, final_transforms))
logger.info(f"Scripts para la transformación: {transforms_per_file}")
Variable.set(key='TRANSFORMS', value=transforms_per_file, serialize_json=True)
return [[item] for item in transforms_per_file]
def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str, Any]) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="TransformacionDeDatos", prefix_group_id=False) as group:
init_task = PythonOperator(
task_id="MASTER_TRANSFORMATION",
python_callable=get_trans_from_xcom,
trigger_rule='all_success'
)
transforms = Variable.get('TRANSFORMS', default_var=[], deserialize_json=True)
if transforms:
transforms = get_trans_from_xcom()
tasks = PythonOperator.partial(
task_id="TRANSFORMATIONS",
python_callable=transformations,
op_kwargs={'intern_conn': db_intern_conn},
on_failure_callback=on_failure_transform,
on_success_callback=on_success_transform
).expand(op_args=[[item] for item in transforms])
).expand(op_args=transforms)
validate_task = PythonOperator(
task_id="VALIDATE_TRANSFORMATION",
......@@ -132,7 +132,7 @@ def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str
op_kwargs={'control_params': control_s3, 'timezone': timezone},
trigger_rule='none_skipped'
)
init_task >> tasks >> validate_task
transforms >> tasks >> validate_task
except Exception as e:
logger.error(f"Error creando taskGroup de transformación. {e}")
finally:
......
......@@ -3,8 +3,13 @@ import uuid
import os
import shutil
import pandas as pd
from sqlalchemy import Column
from sqlalchemy.exc import InvalidRequestError
from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from enums.FileTypeEnum import FileTypeEnum
from enums.DataTypeEnum import DataTypeEnum
from enums.DataTypeOrmEnum import DataTypeOrmEnum
from components.Model.InsumoModel import InsumoModel
from enums.CommentsScriptEnum import CommentsScriptEnum
from components.Timezone import datetime_by_tzone
......@@ -96,9 +101,9 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
add_next = False
final_data = []
table_name = ""
for index, item in enumerate(data):
if item.strip().startswith("--"):
continue
for item in data:
# if item.strip().startswith("--") and label_tablename.strip()+":" not in item:
# continue
if item.lower().strip() == "end":
final_data[-1] = final_data[-1] + "; end;"
final_item = item
......@@ -112,18 +117,34 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
add_next = False
final_item = item.strip()
table_name = ""
if final_item.strip()[:2] in comments and ("update " in final_item.lower() or "delete " in final_item.lower()):
if final_item.strip()[:2] in comments and ("update " in final_item.lower() or "delete " in final_item.lower() or
"alter" in final_item.lower() or "create" in final_item.lower() or
"drop" in final_item.lower()):
trans_index = final_item.lower().find("update")
if trans_index != -1:
final_item = final_item[trans_index:]
delete_index = final_item.lower().find("delete")
if delete_index != -1:
final_item = final_item[delete_index:]
alter_index = final_item.lower().find("alter")
if alter_index != -1:
final_item = final_item[alter_index:]
create_index = final_item.lower().find("create")
if create_index != -1:
final_item = final_item[create_index:]
drop_index = final_item.lower().find("drop")
if drop_index != -1:
final_item = final_item[drop_index:]
final_item = final_item.replace("%", "%%")
final_data.append(final_item)
final_data = [item.replace("\t", "") for item in final_data if item != "" and ("select" in item or
"update" in item or
"delete" in item or
"begin" in item)]
final_data = [item.replace("\t", "") for item in final_data if item != "" and ("select" in item.lower() or
"update" in item.lower() or
"delete" in item.lower() or
"begin" in item.lower() or
"alter" in item.lower() or
"create" in item.lower() or
"drop" in item.lower() or
"commit" in item.lower())]
result.append((row[0], final_data))
logger.info(f"Lista de comandos: {result}")
except Exception as e:
......@@ -135,8 +156,15 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
def select_multiple(command: str) -> Dict[str, Any]:
response = {'is_multiple': False, 'tablename': ''}
tablename = ""
no_procedure_init = "|select"
procedure_init = ["|begin"]
try:
if command.lower().replace(" ", "").find("|select") != -1:
if command.lower().replace(" ", "").find(procedure_init[0]) != -1:
response["is_multiple"] = True
tablename = command[:command.index("|")].strip()
response["tablename"] = tablename
else:
if command.lower().replace(" ", "").find(no_procedure_init) != -1:
response["is_multiple"] = True
tablename = command[:command.index("|")].strip()
init_index = command.lower().find("from")
......@@ -187,3 +215,27 @@ def delete_temp_dir(module_name: str) -> bool:
raise AssertionError(f"Error borrando modulo temporal. {e}")
finally:
return drop
def generateModel(tablename: str, attributes: List[Dict[str, Any]], modelName: str = "TableModel"):
model = type(modelName, (InsumoModel,), {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
try:
for attribute in attributes:
logger.debug(f"attribute: {attribute}")
if attribute["datatype"] == DataTypeEnum.TEXT.name and "maxLength" in attribute.keys():
setattr(model, attribute["identifier"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(attribute["maxLength"])))
else:
setattr(model, attribute["identifier"],
Column(DataTypeOrmEnum[attribute["datatype"]].value))
model = model.__table__
except InvalidRequestError as e:
logger.debug(f"InvalidRequestError. {e}")
except Exception as e:
logger.error(f"Error creando modelo dinámico. {e}")
finally:
return model
from airflow.utils.db import provide_session
from sqlalchemy import or_
from airflow.models import XCom
from airflow.models import XCom, TaskInstance, Variable
import logging
from typing import List, Tuple
from enums.OperationTypeEnum import OperationTypeEnum
logger = logging.getLogger()
......@@ -22,11 +24,11 @@ def save_commands_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_ma
raise AssertionError(f"Script {name} no tiene prefijo de orden. Validar nombre de script")
name += "_" + order[0]
if name.find(extract_mask) != -1:
name = "SELECT|" + name
name = OperationTypeEnum.SELECT.value + "|" + name
elif name.find(transform_mask) != -1:
name = "TRANSFORM|" + name
name = OperationTypeEnum.TRANSFORM.value + "|" + name
elif name.find(procedure_mask) != -1:
name = "PROCEDURE|" + name
name = OperationTypeEnum.PROCEDURE.value + "|" + name
task.xcom_push(key=name, value=data[1])
final_names_xcom.append(name)
task.xcom_push(key="XCOM-EXTRACTION-NAMES", value=final_names_xcom)
......@@ -57,7 +59,19 @@ def delete_all_xcom_tasks() -> None:
XCom.task_id == "GENERATORS", XCom.task_id == "TRANSFORMATIONS",
XCom.task_id == "VALIDATE_EXTRACTION", XCom.task_id == "VALIDATE_GENERATOR",
XCom.task_id == "VALIDATE_TRANSFORMATION")).delete()
session.query(Variable).filter(or_(Variable.key == "SELECTS", Variable.key == "TRANSFORMS",
Variable.key == "GENERATES", Variable.key == "CLEANS")).delete()
delete_task_instances()
cleanup_xcom()
except Exception as e:
logger.error(f"Error borrando todas las variables xcom del DAG actual. {e}")
def delete_task_instances() -> None:
try:
@provide_session
def cleanup_taskinstances(session=None):
session.query(TaskInstance).filter(TaskInstance.state == "removed").delete()
cleanup_taskinstances()
except Exception as e:
logger.error(f"Error borrando TaskInstances. {e}")
......@@ -3,24 +3,24 @@ app:
database:
sources:
source1:
type: oracle
host: 192.168.27.22
port: 21521
username: PRUEBABCOM2
password: admin
type: mysql
host: 192.168.21.52
port: 13306
username: root
password: root
database: bd_tp_qa
service: ORCLPDB1
schema: public
transformation:
type: mysql
host: 192.168.1.11
host: 192.168.1.9
port: 13306
username: root
password: root
database: prueba_bcom
service:
schema:
chunksize: 50000
chunksize: 8000
label_multiple_select: TABLE
source_mask: select # Sufijo (S)
procedure_mask: procedure # S
......
......@@ -7,7 +7,7 @@ from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from components.Utils import update_sql_commands
from components.Xcom import save_commands_to_xcom
from components.Xcom import save_commands_to_xcom, delete_task_instances
from components.S3Route import get_files_from_prefix, get_file_from_key
from components.Sensor import create_s3_sensor
from components.Extractor import get_extract_task_group
......@@ -24,6 +24,10 @@ logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACIONES3"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition.json"
DEFAULT_ARGS = {
'owner': 'BCOM',
"start_date": datetime(2023, 5, 25, 22, 9),
......@@ -77,6 +81,17 @@ def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, An
return groups
def save_procedure_json(json_path: str, task) -> None:
try:
with open(json_path) as f:
data = json.load(f)
logger.info(f"JSON-PROCEDURE {data}")
if data:
task.xcom_push(key="PROCEDURE-JSON", value=data)
except Exception as e:
logger.error(f"Error leyendo y guardando archivo descriptor de procedure. {e}")
def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task):
try:
if not prefix.endswith("/"):
......@@ -84,11 +99,12 @@ def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task)
key = prefix + filename
logger.info(f"EXTRAYENDO ARCHIVO DE CONTROL DESDE {key}")
control = get_file_from_key(conn_id, bucket, key)
if control:
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data)
data = json.load(data)
if not data:
print("ACA", data)
else:
logger.info(f"Json Procedure descargado: {control}")
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": []}]
task.xcom_push(key="CONTROL-CONFIG", value=data)
except Exception as e:
......@@ -101,6 +117,7 @@ def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, tr
try:
extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"],
control_params["filename"], kwargs['ti'])
save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti'])
start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
scripts = get_files_from_prefix(conn_id, bucket, prefix)
......@@ -120,7 +137,7 @@ def set_dag():
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/root/airflow/dags/dag_conf.yml"
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
......
import enum
import pandas as pd
from pymysql import Timestamp
class DataTypeEnum(enum.Enum):
NUMBER = int
TEXT = str
DECIMAL = float
DATETIME = Timestamp
DATE = pd.Timestamp
import enum
from sqlalchemy import Integer, String, Date, DateTime, DECIMAL
class DataTypeOrmEnum(enum.Enum):
NUMBER = Integer
TEXT = String
DECIMAL = DECIMAL
DATE = Date
DATETIME = DateTime
from enum import Enum
class OperationTypeEnum(Enum):
SELECT = "SELECT"
TRANSFORM = "TRANSFORM"
PROCEDURE = "PROCEDURE"
[{
"procedure_identifier": "PROCEDURE_1",
"fields": [
{
"identifier": "CD_EMPRESA",
"datatype": "NUMBER",
"decimal_precision": 0,
"maxLength": null
},
{
"identifier": "CD_FOLIO",
"datatype": "TEXT",
"decimal_precision": null,
"maxLength": 100
},
{
"identifier": "CD_CLIENTE",
"datatype": "TEXT",
"decimal_precision": null,
"maxLength": 50
},
{
"identifier": "FH_CONTRATACION",
"datatype": "DATE",
"pattern": "%d-%m-%y",
"decimal_precision": null,
"maxLength": null
}
]
},
{
"procedure_identifier": "procedure_prueba2",
"fields": [
{
"identifier": "col1",
"datatype": "DATE",
"pattern": "%Y-%m-%d",
"decimal_precision": null,
"maxLength": null
},
{
"identifier": "col2",
"datatype": "TIME",
"pattern": "%H:%M:%S",
"decimal_precision": null,
"maxLength": null
},
{
"identifier": "col3",
"datatype": "DATETIME",
"pattern": "%Y-%m-%d %H:%M:%S",
"decimal_precision": null,
"maxLength": null
}
]
}
]
\ No newline at end of file
......@@ -12,40 +12,6 @@ metadata:
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-envvars-configmap
namespace: bcom-airflow
data:
# The conf below is necessary because of a typo in the config on docker-airflow image:
# https://github.com/puckel/docker-airflow/blob/bed777970caa3e555ef618d84be07404438c27e3/config/airflow.cfg#L934
AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE: bcom-airflow
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: '30'
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: America/Lima
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}'
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.1"
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF: airflow-envvars-configmap
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE: /opt/airflow/templates/pod_template.yaml
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: admin
_AIRFLOW_WWW_USER_PASSWORD: admin
S3_DAGS_DIR: 's3://prueba1234568/dags'
SYNCHRONYZE_DAG_DIR: '30'
MINIO_SERVER: 'http://192.168.49.2:9000'
MINIO_DAGS_DIR: '/prueba-ca/dags'
---
apiVersion: v1
kind: ConfigMap
metadata:
......@@ -90,3 +56,40 @@ data:
- name: logs-persistent-storage
persistentVolumeClaim:
claimName: airflow-logs-pvc
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-envvars-configmap
namespace: bcom-airflow
data:
# The conf below is necessary because of a typo in the config on docker-airflow image:
# https://github.com/puckel/docker-airflow/blob/bed777970caa3e555ef618d84be07404438c27e3/config/airflow.cfg#L934
AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION: 'false'
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: '10'
AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE: bcom-airflow
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: '30'
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: America/Lima
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}'
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.4"
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF: airflow-envvars-configmap
AIRFLOW__KUBERNETES_EXECUTOR__POD_TEMPLATE_FILE: /opt/airflow/templates/pod_template.yaml
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: admin
_AIRFLOW_WWW_USER_PASSWORD: admin
S3_DAGS_DIR: 's3://prueba1234568/dags'
SYNCHRONYZE_DAG_DIR: '30'
MINIO_SERVER: 'http://192.168.49.2:9000'
MINIO_DAGS_DIR: '/prueba-ca/dags'
\ No newline at end of file
......@@ -21,7 +21,7 @@ spec:
spec:
containers:
- name: airflow-scheduler
image: apache/airflow:2.5.3
image: cristianfernando/airflow_custom:0.0.4
args: ["scheduler"]
envFrom:
- configMapRef:
......@@ -32,7 +32,8 @@ spec:
- name: logs-persistent-storage
mountPath: /opt/airflow/logs
- name: pods-templates
mountPath: /opt/airflow/templates
mountPath: /opt/airflow/templates/pod_template.yaml
subPath: pod_template.yaml
volumes:
- name: dags-host-volume
persistentVolumeClaim:
......
......@@ -10,7 +10,7 @@ spec:
- ReadWriteMany
storageClassName: airflow-dags
nfs:
server: 192.168.1.11
server: 192.168.1.9
path: "/mnt/nfs_share"
---
......@@ -27,7 +27,7 @@ spec:
- ReadWriteMany
storageClassName: airflow-postgres
nfs:
server: 192.168.1.11
server: 192.168.1.9
path: "/mnt/nfs_postgres"
---
......@@ -44,7 +44,7 @@ spec:
- ReadWriteMany
storageClassName: airflow-logs
nfs:
server: 192.168.1.11
server: 192.168.1.9
path: "/mnt/nfs_logs"
---
......
apache-airflow[kubernetes]==2.5.3
openpyxl==3.1.2
XlsxWriter==3.1.2
pymysql==1.1.0
oracledb==1.3.2
......@@ -7,4 +7,4 @@ kubectl apply -f airflow-secrets.yaml
kubectl apply -f airflow-webserver-deployment.yaml
kubectl apply -f airflow-webserver-service.yaml
kubectl apply -f airflow-scheduler-deployment.yaml
kubectl apply -f sync-dags-deployment-s3.yaml
kubectl apply -f sync-dags-deployment.yaml
......@@ -5,6 +5,6 @@ kubectl delete -f airflow-secrets.yaml
kubectl delete -f airflow-webserver-service.yaml
kubectl delete -f airflow-webserver-deployment.yaml
kubectl delete -f airflow-scheduler-deployment.yaml
kubectl delete -f sync-dags-deployment-s3.yaml
kubectl delete -f sync-dags-deployment.yaml
kubectl delete -f airflow-volumes.yaml
kubectl delete -f airflow-envvars-configmap.yaml
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment