Commit 12a7e43c authored by Cristian Aguirre's avatar Cristian Aguirre

Update 24-08-23. Add new features: Create multiple indexes, update report, fix bugs.

parent a1345ded
from typing import Any, Dict
import pandas as pd
import logging
logger = logging.getLogger()
def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool = False) -> int:
final_steps = 0
def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool = False) -> Dict[str, Any]:
response = {'steps': 0, 'error': ''}
try:
if is_tablename:
count_command = f'SELECT COUNT(*) FROM {sql_command}'
......@@ -17,23 +19,27 @@ def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool =
total_rows = int(result[0])
logger.info(f"Total de filas: {total_rows}")
if total_rows == chunksize:
final_steps = 1
response["steps"] = 1
else:
final_steps = int(total_rows/chunksize) + 1
response["steps"] = int(total_rows/chunksize) + 1
except Exception as e:
logger.error(f"Error calculando el total de N° de filas desde el comando: {sql_command}. {e}")
message = f"Error calculando el total de N° de filas desde el comando: {sql_command}. {e}"
logger.error(message)
response["error"] = message
finally:
return final_steps
return response
def get_iterator(command: str, chunksize: int, connection) -> iter:
iterator = None
def get_iterator(command: str, chunksize: int, connection) -> Dict[str, Any]:
response = {'iterator': None, 'error': ''}
try:
connection = connection.execution_options(stream_results=True)
iterator = pd.read_sql(command, connection, index_col=None, chunksize=chunksize)
iterator = iter(iterator)
response["iterator"] = iter(iterator)
except Exception as e:
logger.error(f"Error trayendo iterator. {e}")
message = f"Error trayendo iterator. {e}"
logger.error(message)
response["error"] = message
finally:
return iterator
return response
......@@ -14,6 +14,7 @@ from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_control_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from enums.OperationTypeEnum import OperationTypeEnum
from components.Timezone import datetime_by_tzone
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
......@@ -31,6 +32,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider:
success_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="CONTROL-CONFIG")
print("CONF", conf)
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
......@@ -68,7 +70,9 @@ def on_failure_extractor(context) -> None:
tablename = select_multiple(command[1])["tablename"]
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
task_result = {"description": tablename, "status": status, "message": exception}
init_process = ti.xcom_pull(task_ids="EXTRACTORS", key="INIT_PROCESS_DATETIME_" + str(ti.map_index))[0]
task_result = {"description": tablename, "status": status, "message": exception, "init_process": init_process,
"end_process": ""}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="FAILED_TASKS", value=task_name)
......@@ -80,15 +84,20 @@ def on_success_extractor(context) -> None:
command = selects[ti.map_index]
tablename = select_multiple(command[1])["tablename"]
status = ProcessStatusEnum.SUCCESS.value
task_result = {"description": tablename, "status": status, "message": ""}
init_process = ti.xcom_pull(task_ids="EXTRACTORS", key="INIT_PROCESS_DATETIME_"+str(ti.map_index))[0]
end_process = ti.xcom_pull(task_ids="EXTRACTORS", key="END_PROCESS_DATETIME_" + str(ti.map_index))[0]
task_result = {"description": tablename, "status": status, "message": "", "init_process": init_process,
"end_process": end_process}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwargs):
def extract_from_source(command, source_conn, intern_conn, chunksize: int, timezone: str, **kwargs):
if isinstance(command, type(None)):
raise AirflowSkipException
task = kwargs['ti']
init_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="INIT_PROCESS_DATETIME_" + str(task.map_index), value=init_process_datetime)
extract_type = command[0].split("|")[0]
command = command[1]
source_engine = source_conn.engine
......@@ -108,7 +117,10 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa
indexes = []
if "indexes" in procedure.keys():
indexes = procedure["indexes"]
model = generateModel(tablename, procedure["fields"], indexes, intern_conn.db_type)
response_model = generateModel(tablename, procedure["fields"], indexes, intern_conn.db_type)
if response_model["error"] != "":
raise AssertionError(response_model["error"])
model = response_model["model"]
columns_name = [field["name"] for field in procedure["fields"]]
if isinstance(model, type(None)):
raise AssertionError(f"Definición del extracción para {tablename} en el json-descriptor no encontraddo")
......@@ -166,9 +178,15 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa
else:
if command.replace(" ", "").lower().find("|select") != -1:
command = command[command.lower().find("select"):]
steps = get_steps(command, chunksize, source_engine)
response_steps = get_steps(command, chunksize, source_engine)
if response_steps["error"] != "":
raise AssertionError(response_steps["error"])
steps = response_steps["steps"]
# Traemos el iterator
iterator = get_iterator(command, chunksize, source_engine)
response_iterator = get_iterator(command, chunksize, source_engine)
if response_iterator["error"] != "":
raise AssertionError(response_iterator["error"])
iterator = response_iterator["iterator"]
logger.info(f"Número de pasos para migrar datos: {steps}")
start_time = time.time()
for step in range(steps):
......@@ -178,6 +196,8 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa
if save:
logger.info(f"Guardado correctamente dataframe en el paso {step+1}")
logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos")
end_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="END_PROCESS_DATETIME_" + str(task.map_index), value=end_process_datetime)
except Exception as e:
delete = delete_table(tablename, intern_conn.engine)
if delete:
......@@ -229,7 +249,8 @@ def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timez
tasks = PythonOperator.partial(
task_id="EXTRACTORS",
python_callable=extract_from_source,
op_kwargs={'source_conn': db_source_conn, 'intern_conn': db_intern_conn, 'chunksize': chunksize},
op_kwargs={'source_conn': db_source_conn, 'intern_conn': db_intern_conn, 'chunksize': chunksize,
'timezone': timezone},
on_failure_callback=on_failure_extractor,
on_success_callback=on_success_extractor
).expand(op_args=selects)
......
from typing import Any, Dict
import os
import json
import pandas as pd
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
......@@ -8,6 +9,7 @@ from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from enums.ProcessStatusEnum import ProcessStatusEnum
from enums.DataTypeEnum import DataTypeEnum
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.S3Route import save_df_to_s3, load_control_to_s3
from components.Utils import select_multiple, create_temp_file, delete_temp_dir
......@@ -15,6 +17,7 @@ from components.Control import get_tasks_from_control, update_new_process
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps
from enums.OperationTypeEnum import OperationTypeEnum
from components.Timezone import datetime_by_tzone
import logging
......@@ -64,7 +67,9 @@ def on_failure_generator(context) -> None:
table = select_multiple(table)["tablename"]
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
task_result = {"description": table, "status": status, "message": exception}
init_process = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="INIT_PROCESS_DATETIME_" + str(ti.map_index))[0]
task_result = {"description": table, "status": status, "message": exception, "init_process": init_process,
"end_process": ""}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="FAILED_TASKS", value=task_name)
......@@ -76,19 +81,34 @@ def on_success_generator(context) -> None:
table = selects[ti.map_index]
table = select_multiple(table)["tablename"]
status = ProcessStatusEnum.SUCCESS.value
task_result = {"description": table, "status": status, "message": ""}
init_process = ti.xcom_pull(task_ids="GENERATORS", key="INIT_PROCESS_DATETIME_" + str(ti.map_index))[0]
end_process = ti.xcom_pull(task_ids="GENERATORS", key="END_PROCESS_DATETIME_" + str(ti.map_index))[0]
task_result = {"description": table, "status": status, "message": "", "init_process": init_process,
"end_process": end_process}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timezone: str,
provider: str, chunksize=10000):
provider: str, chunksize=10000, **kwargs):
if isinstance(command, type(None)):
raise AirflowSkipException
task = kwargs['ti']
init_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="INIT_PROCESS_DATETIME_" + str(task.map_index), value=init_process_datetime)
engine = intern_conn.engine
logger.debug(f"COMANDO: {command}")
tablename = select_multiple(command)["tablename"]
logger.info(f"Generando resultado de la tabla {tablename}")
definitions = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="EXTRACTION-DEFINITION-JSON")
campos = {}
for procedure in definitions:
if procedure["identifier"] == tablename:
for field in procedure["fields"]:
name = field["name"]
campos[name] = field["datatype"]
# Creamos el archivo temporal
filename_mask = params["filename_mask"]
file_type = params["file_type"]
......@@ -99,33 +119,50 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
logger.info(f"Ruta creada: {tmp_file}")
logger.debug(f"TABLA: {tablename}")
steps = get_steps(tablename, chunksize, engine, True)
steps = steps["steps"]
if intern_conn.db_type == DatabaseTypeEnum.ORACLE.value:
tablename = f"SELECT * FROM {tablename}"
iterator = get_iterator(tablename, chunksize, engine)
iterator = iterator["iterator"]
logger.info(f"Total de pasos para generar archivo resultado: {steps}")
for step in range(steps):
logger.debug(f"STEP: {step}")
header = True if step == 0 else False
try:
dataframe = next(iterator)
for campo in dataframe.columns:
if campo in campos.keys():
if campos[campo] == DataTypeEnum.DATE.name:
dataframe[campo] = dataframe[campo].dt.date
# elif campos[campo] == DataTypeEnum.DATETIME.name: # datetime:
# dataframe[campo] = pd.to_datetime(dataframe[campo], format='%Y-%m-%d %H:%M:%S')
dataframe = dataframe.drop("INTERN_ID_BCOM", axis=1, errors='ignore')
logger.debug(dataframe)
dataframe.to_csv(tmp_file, sep=delimiter, index=False, mode='a', header=header)
except StopIteration:
break
bucket = params["s3_params"]["bucket"]
prefix = params["s3_params"]["prefix"]
conn_id = params["s3_params"]["connection_id"]
list_outputs = params["s3_params"]
size = os.path.getsize(tmp_file)
for output in list_outputs:
if output not in tablename:
continue
bucket = list_outputs[output]["bucket"]
prefix = list_outputs[output]["prefix"]
if not prefix.endswith("/"):
prefix += "/"
file_key = prefix + tmp_file[tmp_file.rfind("/")+1:]
# Se sube el archivo al S3
logger.info(f"Tamaño del archivo a subir: {os.path.getsize(tmp_file)} bytes")
logger.info(f"Tamaño del archivo a subir: {size} bytes")
save_df_to_s3(tmp_file, conn_id, bucket, file_key, provider, in_memory=False)
# Se borra el archivo al finalizar el upload
delete_temp_dir(tmp_file)
break
else:
raise AssertionError(f"No se encontró la ruta de salida para {tablename}")
end_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="END_PROCESS_DATETIME_" + str(task.map_index), value=end_process_datetime)
@task(task_id="MASTER_GENERATION", trigger_rule='none_skipped')
......
......@@ -7,7 +7,7 @@ import logging
logger = logging.getLogger()
def datetime_by_tzone(tzone: str, pattern: str = "%Y-%m-%d"):
def datetime_by_tzone(tzone: str, pattern: str = "%Y-%m-%d %H:%M:%S"):
offset = None
# Algunos casos donde el timezone es de la forma 4:30 y no se encuentra en timezones de pytz (GMT)
if ":" in tzone:
......
......@@ -11,6 +11,7 @@ from components.S3Route import load_control_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from enums.ProcessStatusEnum import ProcessStatusEnum
from enums.OperationTypeEnum import OperationTypeEnum
from components.Timezone import datetime_by_tzone
import logging
......@@ -59,7 +60,9 @@ def on_failure_transform(context) -> None:
script = transform[ti.map_index][0]
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
task_result = {"description": script, "status": status, "message": exception}
init_process = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="INIT_PROCESS_DATETIME_" + str(ti.map_index))[0]
task_result = {"description": script, "status": status, "message": exception, "init_process": init_process,
"end_process": ""}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="FAILED_TASKS", value=task_name)
......@@ -70,22 +73,30 @@ def on_success_transform(context) -> None:
transform = Variable.get('TRANSFORMS', default_var=[], deserialize_json=True)
script = transform[ti.map_index][0]
status = ProcessStatusEnum.SUCCESS.value
task_result = {"description": script, "status": status, "message": ""}
init_process = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="INIT_PROCESS_DATETIME_" + str(ti.map_index))[0]
end_process = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="END_PROCESS_DATETIME_" + str(ti.map_index))[0]
task_result = {"description": script, "status": status, "message": "", "init_process": init_process,
"end_process": end_process}
ti.xcom_push(key=task_name, value=task_result)
ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def transformations(xcom_commands: str, intern_conn):
def transformations(xcom_commands: str, intern_conn, timezone: str, **kwargs):
if isinstance(xcom_commands, type(None)):
raise AirflowSkipException
task = kwargs['ti']
init_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="INIT_PROCESS_DATETIME_" + str(task.map_index), value=init_process_datetime)
engine = intern_conn.engine
script_name = xcom_commands[0]
commands = xcom_commands[1]
logger.info(f"Ejecutando transformaciones del script {script_name}")
with engine.connect() as connection:
for command in commands:
logger.debug(f"Ejecutando comando de transformación: {command}")
logger.info(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
end_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="END_PROCESS_DATETIME_" + str(task.map_index), value=end_process_datetime)
@task(task_id="MASTER_TRANSFORMATION", trigger_rule='none_skipped')
......@@ -127,7 +138,7 @@ def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str
tasks = PythonOperator.partial(
task_id="TRANSFORMATIONS",
python_callable=transformations,
op_kwargs={'intern_conn': db_intern_conn},
op_kwargs={'intern_conn': db_intern_conn, 'timezone': timezone},
on_failure_callback=on_failure_transform,
on_success_callback=on_success_transform
).expand(op_args=transforms)
......
......@@ -4,6 +4,7 @@ import os
import shutil
import pandas as pd
from sqlalchemy import Column
from sqlalchemy import Index
from sqlalchemy.exc import InvalidRequestError
from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from enums.FileTypeEnum import FileTypeEnum
......@@ -92,7 +93,66 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
return data_dict
def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) -> List[Tuple[str, List[str]]]:
def update_sql_commands_2(dataset: List[Tuple[str, str]], label_tablename: str,
extraction_mask: str) -> List[Tuple[str, List[str]]]:
result = []
comments = [CommentsScriptEnum[item].value for item in CommentsScriptEnum._member_names_ if item != CommentsScriptEnum.EXTENDED.name]
try:
for row in dataset:
data = row[1].split("\n")
data = [item.replace("\r", "") for item in data if item.strip() != '']
final_data = []
start_sentence = True
add_next = False
final_command = ""
tablename = ""
extend_comment = False
for item in data:
if not extend_comment and item.strip().startswith(CommentsScriptEnum.EXTENDED.value):
extend_comment = True
continue
if extend_comment and item.strip().startswith(CommentsScriptEnum.EXTENDED.value):
extend_comment = False
continue
if extend_comment:
continue
if any([item.strip().startswith(comment) for comment in comments]):
if item.find(label_tablename+":") != -1:
index = item.find(label_tablename+":")
label_lenght = len(label_tablename+":")
tablename = item[index+label_lenght:].strip()
add_next = True
else:
if start_sentence:
final_command = item
else:
final_command += " " + item
if final_command.strip().endswith(";"):
start_sentence = True
if add_next:
final_command = tablename + "|" + final_command.strip()[:-1]
add_next = False
final_item = final_command.replace("%", "%%")
final_data.append(final_item)
else:
start_sentence = False
result.append((row[0], final_data))
logger.info(f"Lista de comandos: {result}")
except Exception as e:
logger.error(f"Error extrayendo comandos sql. {e}")
finally:
return result
def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str,
extraction_mask: str) -> List[Tuple[str, List[str]]]:
"""
DEPRECATED: This method is replaced by update_sql_commands_2
:param dataset:
:param label_tablename:
:param extraction_mask:
:return:
"""
result = []
allowed_commands = ["create", "update", "delete", "select", "alter", "drop", "begin", "commit"]
comments = [CommentsScriptEnum[item].value for item in CommentsScriptEnum._member_names_]
......@@ -106,6 +166,14 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
for item in data:
if item.lower().strip() == "end":
final_data[-1] = final_data[-1] + "; end;"
# parts = item.split(CommentsScriptEnum.DASHES.value)
# parts = [part for part in parts if len(part.strip()) > 0]
# print(parts)
# if len(parts) > 1:
# for part in parts:
# if not part.strip().lower().startswith(label_tablename.lower()):
# continue
final_item = item
if item.lower().strip().find(label_tablename.lower().strip() + ":") != -1:
init_index = item.replace(" ", "").lower().strip().index(label_tablename.lower().strip() + ":")
......@@ -123,6 +191,11 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
if final_item.lower().find(command) != -1]
if len(init_indexes) > 0:
init_index = init_indexes[0]
if row[0].lower().find(extraction_mask.lower()) != -1 and \
final_item.replace(" ", "").startswith(CommentsScriptEnum.DASHES.value) and \
not final_item.replace(" ", "")[len(CommentsScriptEnum.DASHES.value):].startswith(
label_tablename):
continue
final_item = final_item[init_index:]
final_item = final_item.replace("%", "%%")
final_data.append(final_item)
......@@ -199,8 +272,9 @@ def delete_temp_dir(module_name: str) -> bool:
return drop
def generateModel(tablename: str, attributes: List[Dict[str, Any]], indexes: List[str], db_target: str,
modelName: str = "TableModel"):
def generateModel(tablename: str, attributes: List[Dict[str, Any]], indexes: List[Dict[str, Any]], db_target: str,
modelName: str = "TableModel") -> Dict[str, Any]:
response = {'model': None, 'error': ''}
default_precision = 8
model_args = {
'__tablename__': tablename,
......@@ -212,29 +286,46 @@ def generateModel(tablename: str, attributes: List[Dict[str, Any]], indexes: Lis
model = type(modelName, (InsumoModel,), model_args)
try:
for attribute in attributes:
index = False
if attribute["name"] in indexes:
index = True
logger.debug(f"attribute: {attribute}")
if attribute["datatype"] == DataTypeEnum.TEXT.name and "maxLength" in attribute.keys():
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(attribute["maxLength"]), index=index))
Column(DataTypeOrmEnum[attribute["datatype"]].value(attribute["maxLength"])))
elif attribute["datatype"] == DataTypeEnum.DECIMAL.name:
precision = default_precision
if "decimal_precision" in attribute.keys():
precision = attribute["decimal_precision"]
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(38, precision), index=index))
Column(DataTypeOrmEnum[attribute["datatype"]].value(38, precision)))
else:
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value, index=index))
Column(DataTypeOrmEnum[attribute["datatype"]].value))
model = model.__table__
for current_index in indexes:
index_name = tablename + "_" + current_index["name"]
fields = current_index["index_fields"]
final_fields = [model.c[field] for field in fields]
Index(index_name, *final_fields)
response["model"] = model
except InvalidRequestError as e:
logger.debug(f"InvalidRequestError. {e}")
message = f"InvalidRequestError. {e}"
logger.debug(message)
response["error"] = message
except Exception as e:
logger.error(f"Error creando modelo dinámico. {e}")
message = f"Error creando modelo dinámico. {e}"
logger.error(message)
response["error"] = message
finally:
return model
return response
def generate_indexes(tablename: str, indexes: List[Dict[str, Any]], db_target: str) -> None:
try:
for current_index in indexes:
index_name = current_index["name"]
fields = current_index["index_fields"]
except Exception as e:
logger.error(f"Error creando índices. {e}")
def delete_temp_dirs(tmp_dir: str) -> bool:
......
......@@ -6,20 +6,20 @@ app:
sources:
source1:
type: mysql
host: 192.168.1.13
host: 192.168.21.52
port: 13306
username: root
password: root
database: prueba
password: root1234
database: bcom_tp_res_bk
service:
schema: sources
transformation:
type: mysql
host: 192.168.1.13
host: 192.168.1.4
port: 13306
username: root
password: root
database: prueba_ca
database: prueba_bcom2
service:
schema: intern_db
chunksize: 4000
......@@ -28,7 +28,7 @@ app:
procedure_mask: procedure # S
transformation_mask: transform # S
prefix_order_delimiter: .
cloud_provider: aws
cloud_provider: local
scripts:
s3_params:
bucket: prueba1234568
......@@ -48,8 +48,12 @@ app:
delimiter: '|'
tmp_path: /tmp
s3_params:
TACOMVENTAS:
bucket: prueba1234568
prefix: bcom_results
RANGO_VENTAS_CON_PROMOCION:
bucket: prueba-id
prefix: prueba_bcom/bcom_results
connection_id: conn_script
report:
s3_params:
......@@ -60,3 +64,5 @@ app:
datetime_pattern: '%Y-%m-%d %H:%M:%S'
procedure:
filepath: "/opt/airflow/dags/procedure_definition.json"
......@@ -69,44 +69,70 @@ def create_and_upload_report(tmp_path: str, report_params: Dict[str, Any], provi
worksheet = workbook.add_worksheet("report")
worksheet.set_zoom(90)
title = "Reporte de último proceso ejecutado"
title = "Reporte de Último proceso ejecutado"
title_format = workbook.add_format()
title_format.set_font_size(20)
title_format.set_font_color("#333333")
title_format.set_align('center')
header = f"Proceso ejecutado el día {execution_date}"
status_description = ""
if status == ProcessStatusEnum.SUCCESS.value:
status = "EXITOSO"
status_description = "EXITOSO"
elif status == ProcessStatusEnum.FAIL.value:
status = "FALLIDO"
status_description = "FALLIDO"
elif status == ProcessStatusEnum.RESET.value:
status = "RESETEADO POR EL USUARIO"
status = f"Estado de último proceso ejecutado: {status}"
status_description = "RESETEADO POR EL USUARIO"
header_format = workbook.add_format()
header_format.set_font_size(10)
header_format.set_font_color("#080606")
worksheet.merge_range('A1:N1', title, title_format)
worksheet.merge_range('A2:N2', header, header_format)
worksheet.merge_range('A3:N3', status, header_format)
header_format_2 = workbook.add_format({'bold': True})
header_format_2.set_font_size(10)
header_format_2.set_font_color("#080606")
worksheet.merge_range('A1:Q1', title, title_format)
worksheet.merge_range('A3:D3', "Proceso ejecutado el día: ", header_format)
worksheet.merge_range('E3:N3', execution_date, header_format_2)
worksheet.merge_range('A4:D4', "Estado de último proceso ejecutado: ", header_format)
worksheet.merge_range('E4:N4', status_description, header_format_2)
row_format = workbook.add_format()
row_format.set_font_size(8)
row_format.set_font_color("#000000")
row_format_2 = workbook.add_format()
row_format_2.set_font_size(8)
row_format_2.set_font_color("#000000")
row_format_2.set_align('center')
header_format = workbook.add_format({'bold': True})
header_format.set_font_size(8)
header_format.set_align('center')
header_format.set_align('vcenter')
if status != ProcessStatusEnum.RESET.value:
base_index = 5
base_index = 6
worksheet.merge_range('A' + str(base_index) + ':B' + str(base_index), "ID Proceso", header_format)
worksheet.merge_range('C' + str(base_index) + ':D' + str(base_index), "Fecha Inicio del Proceso", header_format)
worksheet.merge_range('E' + str(base_index) + ':F' + str(base_index), "Fecha Fin del Proceso", header_format)
worksheet.merge_range('G' + str(base_index) + ':L' + str(base_index), "Descripción del Proceso", header_format)
worksheet.merge_range('M' + str(base_index) + ':N' + str(base_index), "Estado del Proceso", header_format)
if status == ProcessStatusEnum.FAIL.value:
worksheet.merge_range('O' + str(base_index) + ':S' + str(base_index), "Descripción del Error", header_format)
for index, key in enumerate(data.keys()):
index = base_index + index
worksheet.merge_range('A'+str(index)+':B'+str(index), key, row_format)
index = base_index + index + 1
worksheet.merge_range('A'+str(index)+':B'+str(index), key, row_format_2)
worksheet.merge_range('C' + str(index) + ':D' + str(index), data[key]['INIT_PROCESS'], row_format_2)
worksheet.merge_range('E' + str(index) + ':F' + str(index), data[key]['END_PROCESS'], row_format_2)
if data[key]["TYPE"] == "EXTRACTION":
worksheet.merge_range('C'+str(index)+':G'+str(index), f"TABLA DE EXTRACCIÓN: {data[key]['DESCRIPTION']}", row_format)
worksheet.merge_range('G'+str(index)+':L'+str(index), f"TABLA DE EXTRACCIÓN: {data[key]['DESCRIPTION']}", row_format)
elif data[key]["TYPE"] == "TRANSFORMATION":
script = data[key]["DESCRIPTION"].split("|")[1]
worksheet.merge_range('C'+str(index)+':G'+str(index), f"SCRIPT DE TRANSFORMACIÓN: {script}", row_format)
worksheet.merge_range('G'+str(index)+':L'+str(index), f"SCRIPT DE TRANSFORMACIÓN: {script}", row_format)
elif data[key]["TYPE"] == "GENERATION":
worksheet.merge_range('C'+str(index)+':G'+str(index), f"ARCHIVO GENERADO DESDE LA TABLA: {data[key]['DESCRIPTION']}", row_format)
worksheet.merge_range('H'+str(index)+':I'+str(index), f"ESTADO: {data[key]['STATUS']}", row_format)
worksheet.merge_range('J'+str(index)+':N'+str(index), data[key]['MESSAGE'], row_format)
worksheet.merge_range('G'+str(index)+':L'+str(index), f"ARCHIVO GENERADO DESDE LA TABLA: {data[key]['DESCRIPTION']}", row_format)
worksheet.merge_range('M'+str(index)+':N'+str(index), f"{data[key]['STATUS']}", row_format_2)
if data[key]['STATUS'] == ProcessStatusEnum.FAIL.value:
worksheet.merge_range('O'+str(index)+':S'+str(index), data[key]['MESSAGE'], row_format)
# Upload report
upload_report(excel_tmp_path, report_params, provider, timezone)
except Exception as e:
......@@ -134,12 +160,14 @@ def get_data_report(**kwargs) -> None:
if process["status"] == ProcessStatusEnum.SUCCESS.value:
break
total_tasks.append(process["tasks"])
final_key_tasks, final_key_desc, final_key_message = {}, {}, {}
final_key_tasks, final_key_desc, final_key_message, final_inits, final_ends = {}, {}, {}, {}, {}
for tasks in total_tasks:
for key in tasks.keys():
this_status = tasks[key]["status"]
this_desc = tasks[key]["description"]
this_message = tasks[key]["message"]
this_init = tasks[key]["init_process"]
this_end = tasks[key]["end_process"]
if key in final_key_tasks.keys():
task_status = final_key_tasks[key]
if this_status == ProcessStatusEnum.SUCCESS.value and \
......@@ -147,10 +175,14 @@ def get_data_report(**kwargs) -> None:
final_key_tasks.update({key: this_status})
final_key_desc.update({key: this_desc})
final_key_message.update({key: ''})
final_inits.update({key: this_init})
final_ends.update({key: this_end})
else:
final_key_tasks.update({key: this_status})
final_key_desc.update({key: this_desc})
final_key_message.update({key: this_message})
final_inits.update({key: this_init})
final_ends.update({key: this_end})
for item in final_key_tasks.keys():
if item.lower().startswith("extract"):
type_task = "EXTRACTION"
......@@ -159,7 +191,8 @@ def get_data_report(**kwargs) -> None:
else:
type_task = "GENERATION"
report_data.update({item: {"STATUS": final_key_tasks[item], "TYPE": type_task,
"DESCRIPTION": final_key_desc[item], 'MESSAGE': final_key_message[item]}})
"DESCRIPTION": final_key_desc[item], 'MESSAGE': final_key_message[item],
'INIT_PROCESS': final_inits[item], 'END_PROCESS': final_ends[item]}})
report_data.update({"PROCESS_STATUS": current_status, "PROCESS_EXECUTION": last_process["date"]})
task.xcom_push(key="REPORT-DATA", value=report_data)
logger.info(f"Diccionario de datos para el reporte: {report_data}")
......
......@@ -6,7 +6,7 @@ from io import StringIO
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from components.Utils import update_sql_commands
from components.Utils import update_sql_commands_2
from components.Xcom import save_commands_to_xcom
from components.S3Route import get_files_from_prefix, get_file_from_prefix
from components.Sensor import create_sensor
......@@ -116,12 +116,11 @@ def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, tr
try:
extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"],
kwargs['ti'], provider, timezone)
# save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti'])
save_procedure_json(procedure_filepath, kwargs['ti'])
start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
scripts = get_files_from_prefix(conn_id, bucket, prefix, provider)
scripts = update_sql_commands(scripts, label_tablename)
scripts = update_sql_commands_2(scripts, label_tablename, source_mask)
save_commands_to_xcom(scripts, kwargs['ti'], source_mask, transform_mask, procedure_mask, order_delimiter)
logger.debug(f"Script cargados en Xcom: {scripts}")
logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos")
......
......@@ -4,3 +4,4 @@ from enum import Enum
class CommentsScriptEnum(Enum):
DASHES = "--"
NUMERAL = "#"
EXTENDED = "'''"
[
{
"identifier": "RANGO_VENTAS_CON_PROMOCION",
"fields": [
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "NU_ADDON",
"datatype": "TEXT",
"maxLength": 15
},
{
"name": "CD_CLIENTE",
"datatype": "TEXT",
"maxLength": 50
}
],
"save_output": true
},
{
"identifier": "TACOMVENTAS",
"fields": [
......@@ -140,7 +161,30 @@
}
],
"indexes": [
"CD_PAQUETE", "NU_ADDON", "CD_CLIENTE"
{
"name": "indice1",
"index_fields": [
"CD_FOLIO"
]
},
{
"name": "indice2",
"index_fields": [
"CD_PAQUETE"
]
},
{
"name": "indice3",
"index_fields": [
"CD_CLIENTE"
]
},
{
"name": "indice4",
"index_fields": [
"NU_ADDON"
]
}
],
"save_output": true
},
......@@ -238,8 +282,15 @@
"maxLength": 100
}
],
"indexes": ["CD_PAQUETE"]
},
"indexes": [
{
"name": "indice1",
"index_fields" : [
"CD_PAQUETE"
]
}
]
},
{
"identifier": "CATALOGO_PROMOCION",
"fields": [
......@@ -254,7 +305,7 @@
"maxLength": 50
}
]
},
},
{
"identifier": "TEMP_PROMO",
"fields": [
......@@ -269,7 +320,7 @@
"maxLength": 50
}
]
},
},
{
"identifier": "RELACION_POID_PAQ",
"fields": [
......@@ -284,7 +335,7 @@
"maxLength": 50
}
]
},
},
{
"identifier": "RELACION_PAQINI_PAQFIN",
"fields": [
......@@ -299,15 +350,15 @@
"maxLength": 50
}
]
},
},
{
"identifier": "ADDONS_UNICO",
"fields": [
{
"name": "CD_PAQUETE",
"name": "COD_PAQ",
"datatype": "TEXT",
"maxLength": 50
}
]
}
}
]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment