Commit 05e1b5a3 authored by Erly Villaroel's avatar Erly Villaroel

Definicion de procedure y conexiones

parent 2c9764fa
......@@ -66,8 +66,12 @@ def on_failure_extractor(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('SELECTS', default_var=[], deserialize_json=True)
logger.info(f"TASK_NAME_EXTRACTOR: {task_name}")
logger.info(f"SELECTS_EXTRACTOR: {selects}")
command = selects[ti.map_index]
logger.info(f"COMAND_EXTRACTOR: {command}")
tablename = select_multiple(command[1])["tablename"]
logger.info(f"TABLENAME_EXTRACTOR: {tablename}")
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
init_process = ti.xcom_pull(task_ids="EXTRACTORS", key="INIT_PROCESS_DATETIME_" + str(ti.map_index))[0]
......
......@@ -78,6 +78,9 @@ def on_success_generator(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('GENERATES', default_var=[], deserialize_json=True)
logger.info(f"TASK_NAME: {task_name}")
logger.info(f"SELECTS: {selects}")
logger.info(f"TI_MAP_INDEX: {ti.map_index}")
table = selects[ti.map_index]
table = select_multiple(table)["tablename"]
status = ProcessStatusEnum.SUCCESS.value
......@@ -142,7 +145,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
except StopIteration:
break
conn_id = params["s3_params"]["connection_id"]
list_outputs = params["s3_params"]
size = os.path.getsize(tmp_file)
for output in list_outputs:
......@@ -150,6 +153,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
continue
bucket = list_outputs[output]["bucket"]
prefix = list_outputs[output]["prefix"]
conn_id = list_outputs[output]["connection_id"]
if not prefix.endswith("/"):
prefix += "/"
file_key = prefix + tmp_file[tmp_file.rfind("/")+1:]
......@@ -170,7 +174,9 @@ def get_generate_from_xcom(**kwargs):
task = kwargs['ti']
final_outputs = []
conf = task.xcom_pull(task_ids="VALIDATE_TRANSFORMATION", key="CONTROL-CONFIG")
logger.info(f"CONF_GENERATION: {conf}")
tasks = get_tasks_from_control(conf, "generator")
logger.info(f"TASKS_GENERATION: {tasks}")
tasks_with_save = []
definition = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="EXTRACTION-DEFINITION-JSON")
......@@ -184,12 +190,17 @@ def get_generate_from_xcom(**kwargs):
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
logger.info(f"KEY: {key}")
if not key.startswith(OperationTypeEnum.SELECT.value) and not key.startswith(OperationTypeEnum.PROCEDURE.value):
continue
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo tablas {xcom_outputs}")
for select in xcom_outputs:
logger.info(f"SELECT: {select}")
tablename = select_multiple(select)["tablename"]
logger.info(f"TABLENAME {tablename}")
logger.info(f"TASKS: {tasks}")
logger.info(f"TASKS_WITH_SAVE: {tasks_with_save}")
if (tasks["reset"] or tasks["status"] == ProcessStatusEnum.SUCCESS.value or select not in success_tasks) and tablename in tasks_with_save:
final_outputs.append(select)
logger.info(f"Final outputs: {final_outputs}")
......
......@@ -5,7 +5,9 @@ from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from components.S3Route import get_files_from_prefix, get_file_from_prefix
from components.Xcom import save_commands_to_xcom
from components.Utils import update_sql_commands_2
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_control_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
......@@ -91,8 +93,12 @@ def transformations(xcom_commands: str, intern_conn, timezone: str, **kwargs):
script_name = xcom_commands[0]
commands = xcom_commands[1]
logger.info(f"Ejecutando transformaciones del script {script_name}")
not_procedure = ["UPDATE","SELECT","CREATE","ALTER","DROP","DELETE","INSERT","GRANT","REVOKE","TRUNCATE","COPY",
"COMMIT","ROLLBACK","USE"]
with engine.connect() as connection:
for command in commands:
if any(command.startswith(palabra) or command.startswith(palabra.lower()) for palabra in not_procedure):
print(2)
logger.info(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
end_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
......@@ -100,8 +106,23 @@ def transformations(xcom_commands: str, intern_conn, timezone: str, **kwargs):
@task(task_id="MASTER_TRANSFORMATION", trigger_rule='none_skipped')
def get_trans_from_xcom(**kwargs):
def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablename, transform_mask, order_delimiter,
db_intern_conn, **kwargs):
task = kwargs['ti']
#CAMBIOS PARA TRAER LOS SP
engine = db_intern_conn.engine
conn_id = store_procedure["s3_params"]["connection_id"]
bucket = store_procedure["s3_params"]["bucket"]
prefix = store_procedure["s3_params"]["prefix"]
procedures = get_files_from_prefix(conn_id, bucket, prefix, provider)
with engine.connect() as connection:
for procedure in procedures:
procedure = procedure[1]
logger.info(f"Ejecutando creacion de procedure: {procedure}")
_ = connection.execute(procedure)
save_commands_to_xcom(procedures, kwargs['ti'], procedure_mask, transform_mask, procedure_mask, order_delimiter)
logger.debug(f"Procedures cargados en Xcom: {procedures}")
transforms_per_file = []
conf = task.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
tasks = get_tasks_from_control(conf, "transformation")
......@@ -129,11 +150,13 @@ def get_trans_from_xcom(**kwargs):
def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str, Any],
provider: str) -> TaskGroup or None:
provider: str, store_procedure: Dict, procedure_mask: str,
label_tablename: str, transform_mask: str, order_delimiter: str) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="TransformacionDeDatos", prefix_group_id=False) as group:
transforms = get_trans_from_xcom()
transforms = get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablename, transform_mask, order_delimiter,
db_intern_conn)
tasks = PythonOperator.partial(
task_id="TRANSFORMATIONS",
......
......@@ -97,10 +97,14 @@ def update_sql_commands_2(dataset: List[Tuple[str, str]], label_tablename: str,
extraction_mask: str) -> List[Tuple[str, List[str]]]:
result = []
comments = [CommentsScriptEnum[item].value for item in CommentsScriptEnum._member_names_ if item != CommentsScriptEnum.EXTENDED.name]
logger.info(f"COMENTS: {comments}")
try:
for row in dataset:
logger.info(f"DATASET: {dataset}")
data = row[1].split("\n")
logger.info(f"data inicial: {data}")
data = [item.replace("\r", "") for item in data if item.strip() != '']
logger.info(f"data final: {data}")
final_data = []
start_sentence = True
add_next = False
......@@ -108,6 +112,7 @@ def update_sql_commands_2(dataset: List[Tuple[str, str]], label_tablename: str,
tablename = ""
extend_comment = False
for item in data:
logger.info(f"Item in data: {item}")
if not extend_comment and item.strip().startswith(CommentsScriptEnum.EXTENDED.value):
extend_comment = True
continue
......@@ -286,7 +291,6 @@ def generateModel(tablename: str, attributes: List[Dict[str, Any]], indexes: Lis
model = type(modelName, (InsumoModel,), model_args)
try:
for attribute in attributes:
logger.debug(f"attribute: {attribute}")
if attribute["datatype"] == DataTypeEnum.TEXT.name and "maxLength" in attribute.keys():
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(attribute["maxLength"])))
......
......@@ -14,11 +14,14 @@ def save_commands_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_ma
final_names_xcom = []
try:
for data in dataset:
logger.info(f"DATASET: {dataset}")
logger.info(f"Guardando Xcom en llave {data[0]}")
name = data[0]
base_name = name
if order_delimiter == ".":
base_name = base_name[:base_name.rfind(".")]
logger.info(f"BASE_NAME: {base_name}")
logger.info(f"ORDER DELIMITER: {order_delimiter}")
order = base_name.split(order_delimiter)
if len(order) < 2:
raise AssertionError(f"Script {name} no tiene prefijo de orden. Validar nombre de script")
......
......@@ -6,24 +6,25 @@ app:
sources:
source1:
type: mysql
host: 192.168.21.52
host: 192.168.1.13
port: 13306
username: root
password: root1234
database: bcom_tp_res_bk
password: root
database: prueba
service:
schema: sources
transformation:
type: mysql
host: 192.168.1.4
host: 192.168.1.13
port: 13306
username: root
password: root
database: prueba_bcom2
database: prueba_ca
service:
schema: intern_db
chunksize: 4000
label_multiple_select: TABLENAME
label_transform_procedure: STORE
source_mask: select # Sufijo (S)
procedure_mask: procedure # S
transformation_mask: transform # S
......@@ -31,14 +32,20 @@ app:
cloud_provider: local
scripts:
s3_params:
bucket: prueba1234568
prefix: bcom_scripts
bucket: prueba-id
prefix: prueba_bcom/bcom_scripts
connection_id: conn_script
store_procedures:
s3_params:
bucket: prueba-id
prefix: prueba_bcom/bcom_store_procedures
connection_id: conn_script
control:
s3_params:
connection_id: conn_script
bucket: prueba1234568
prefix: bcom_control
bucket: prueba-id
prefix: prueba_bcom/bcom_control
filename: control_<period>.json
timezone: 'GMT-5'
outputs:
......@@ -48,21 +55,26 @@ app:
delimiter: '|'
tmp_path: /tmp
s3_params:
TACOMVENTAS:
bucket: prueba1234568
prefix: bcom_results
RANGO_VENTAS_CON_PROMOCION:
bucket: prueba-id
prefix: prueba_bcom/bcom_results
connection_id: conn_script
ESTUDIANTES_1:
bucket: pruebairflow
prefix: bcom_results
connection_id: prueba_af
ESTUDIANTES_11:
bucket: pruebairflow
prefix: bcom_results
connection_id: prueba_af
CATALOGO_PROMOCIONES:
bucket: pruebairflow
prefix: bcom_results
connection_id: prueba_af
report:
s3_params:
bucket: prueba1234568
prefix: bcom_report
bucket: prueba-id
prefix: prueba_bcom/bcom_report
connection_id: conn_script
filename: report_<datetime>.xlsx
datetime_pattern: '%Y-%m-%d %H:%M:%S'
procedure:
filepath: "/opt/airflow/dags/procedure_definition.json"
filepath: "/opt/airflow/dags/procedure_prueba.json"
......@@ -59,10 +59,11 @@ def generate_and_deploy_results(intern_conn, parameters: Dict[str, Any], timezon
return groups
def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any], provider: str) -> TaskGroup:
def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any], provider: str, store_procedure: Dict, procedure_mask: str,
label_tablename: str, transform_mask: str, order_delimiter: str) -> TaskGroup:
groups = None
try:
groups = get_transform_task_group(intern_conn, timezone, control_s3, provider)
groups = get_transform_task_group(intern_conn, timezone, control_s3, provider, store_procedure, procedure_mask, label_tablename, transform_mask, order_delimiter)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
......@@ -190,10 +191,13 @@ def set_dag():
extractions = extraction(source_db, intern_db, timezone, control_s3, conf["cloud_provider"], chunksize)
# Creación de grupo de tasks para las transformaciones
transformations = transformation(intern_db, timezone, control_s3, conf["cloud_provider"])
store_procedure = conf["store_procedures"]
transformations = transformation(intern_db, timezone, control_s3, conf["cloud_provider"], store_procedure, procedure_mask, conf["label_transform_procedure"],
transform_mask, order_delimiter)
# Creación de grupo de tasks para la generación y despliegue de archivos resultados
outputs_conf = conf["outputs"]
result = generate_and_deploy_results(intern_db, outputs_conf, timezone, control_s3, conf["cloud_provider"])
# Creación de tasks de limpiadores
......
[
{
"identifier": "obtenerEstudiantes",
"transformation_store_procedure": true
},
{
"identifier": "ESTUDIANTES_11",
"fields": [
......@@ -29,7 +33,12 @@
}
],
"indexes": [
"ID"
{
"name" : "indice1",
"index_fields": [
"ID"
]
}
],
"save_output" : true
},
......@@ -37,33 +46,26 @@
"identifier": "ESTUDIANTES_1",
"fields": [
{
"name": "ID",
"name": "id",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "Nombre",
"datatype": "TEXT",
"maxLength": 50
"name": "fecha",
"datatype": "DATE"
},
{
"name": "Apellido",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "Edad",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "CorreoElectronico",
"datatype": "TEXT",
"maxLength": 100
"name": "fecha_tiempo",
"datatype": "DATETIME"
}
],
"indexes": [
"ID"
{
"name" : "indice1",
"index_fields": [
"id"
]
}
],
"save_output" : true
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment