Commit b9272727 authored by Erly Villaroel's avatar Erly Villaroel

DAG Reset process

parent 2a3813cb
......@@ -42,7 +42,8 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
def clean(command: str, intern_conn):
engine = intern_conn.engine
tablename = select_multiple(command)["tablename"]
#tablename = select_multiple(command)["tablename"]
tablename = command
logger.info(f"Borrando tabla {tablename}")
delete = delete_table(tablename, engine)
if delete:
......@@ -65,8 +66,12 @@ def get_cleaners_from_xcom(**kwargs):
final_selects.append(select)
logger.info(f"Final selects: {final_selects}")
Variable.set(key='CLEANS', value=final_selects, serialize_json=True)
if len(final_selects) > 0:
return [[item] for item in final_selects]
created_tables = task.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="TABLES_CREATED")
lista = []
for i in created_tables:
lista.append(i)
if len(lista) > 0:
return [[item] for item in lista]
else:
return [[None]]
......
......@@ -43,26 +43,16 @@ def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str,
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
current_period = str(datetime_by_tzone(timezone, format_date))[:7]
processed_period = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="PROCESSED_PERIOD")
logger.info(f"ARCHIVO DE CONFIGURACION : {conf}")
if delete_last:
last_tasks = conf[-1]["tasks"]
tasks.update(last_tasks)
conf.pop(-1)
current_datetime = str(datetime_by_tzone(timezone, '%Y-%m-%d %H:%M:%S'))
new_process = {"date": current_datetime, "status": status, "tasks": tasks, "created_tables": created_tables}
if conf is None:
conf = []
if current_period == processed_period:
conf.append(new_process)
else:
conf = [new_process]
if current_period == processed_period and isinstance(conf, List):
conf.append(new_process)
else:
conf = [conf]
if current_period == processed_period:
conf.append(new_process)
else:
conf = [new_process]
logger.info(f"NUEVO PROCESO {conf}")
conf = [new_process]
except Exception as e:
logger.error(f"Error actualizando archivo de control. {e}")
finally:
......
......@@ -17,7 +17,6 @@ def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool =
result = conn.execute(count_command).first()
if result:
total_rows = int(result[0])
logger.info(f"Total de filas: {total_rows}")
if total_rows == chunksize:
response["steps"] = 1
else:
......
......@@ -18,7 +18,7 @@ def execute_transformations(commands: List[str], engine):
def delete_table(tablename: str, engine) -> bool:
delete = False
try:
command = f'DROP TABLE {tablename}'
command = f'DROP TABLE IF EXISTS {tablename}'
start_time = time.time()
with engine.connect() as conn:
try:
......
......@@ -69,12 +69,8 @@ def on_failure_extractor(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('SELECTS', default_var=[], deserialize_json=True)
logger.info(f"TASK_NAME_EXTRACTOR: {task_name}")
logger.info(f"SELECTS_EXTRACTOR: {selects}")
command = selects[ti.map_index]
logger.info(f"COMAND_EXTRACTOR: {command}")
tablename = select_multiple(command[1])["tablename"]
logger.info(f"TABLENAME_EXTRACTOR: {tablename}")
exception = str(context["exception"])
status = ProcessStatusEnum.FAIL.value
init_process = ti.xcom_pull(task_ids="EXTRACTORS", key="INIT_PROCESS_DATETIME_" + str(ti.map_index))[0]
......@@ -138,7 +134,6 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
if create:
logger.info(f"Creado correctamente la tabla {tablename}. Creado?: {create}")
extract_tables.append(tablename)
logger.info(f"TABLASCREADAS: {extract_tables}")
task.xcom_push(key="TABLES_CREATED", value=tablename)
else:
raise AssertionError(f"Error creando tabla {tablename}")
......
......@@ -28,6 +28,9 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, provider: s
delete_task_instances()
ti = kwargs["ti"]
created_tables = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="TABLES_CREATED")
lista = []
for i in created_tables:
lista.append(i)
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="GENERATORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_TRANSFORMATION", key="CONTROL-CONFIG")
......@@ -42,7 +45,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, provider: s
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti, created_tables, True)
conf = update_new_process(conf, status, final_dict, timezone, ti, lista, True)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -79,9 +82,6 @@ def on_success_generator(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('GENERATES', default_var=[], deserialize_json=True)
logger.info(f"TASK_NAME: {task_name}")
logger.info(f"SELECTS: {selects}")
logger.info(f"TI_MAP_INDEX: {ti.map_index}")
table = selects[ti.map_index]
table = select_multiple(table)["tablename"]
status = ProcessStatusEnum.SUCCESS.value
......@@ -173,9 +173,7 @@ def get_generate_from_xcom(**kwargs):
task = kwargs['ti']
final_outputs = []
conf = task.xcom_pull(task_ids="VALIDATE_TRANSFORMATION", key="CONTROL-CONFIG")
logger.info(f"CONF_GENERATION: {conf}")
tasks = get_tasks_from_control(conf, "generator")
logger.info(f"TASKS_GENERATION: {tasks}")
tasks_with_save = []
definition = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="EXTRACTION-DEFINITION-JSON")
......@@ -189,20 +187,14 @@ def get_generate_from_xcom(**kwargs):
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
logger.info(f"KEY: {key}")
if not key.startswith(OperationTypeEnum.SELECT.value) and not key.startswith(OperationTypeEnum.PROCEDURE.value):
continue
xcom_outputs = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key=key)
logger.info(f"Trayendo tablas {xcom_outputs}")
for select in xcom_outputs:
logger.info(f"SELECT: {select}")
tablename = select_multiple(select)["tablename"]
logger.info(f"TABLENAME {tablename}")
logger.info(f"TASKS: {tasks}")
logger.info(f"TASKS_WITH_SAVE: {tasks_with_save}")
if (tasks["reset"] or tasks["status"] == ProcessStatusEnum.SUCCESS.value or select not in success_tasks) and tablename in tasks_with_save:
final_outputs.append(select)
logger.info(f"Final outputs: {final_outputs}")
Variable.set(key='GENERATES', value=final_outputs, serialize_json=True)
if len(final_outputs) > 0:
return [[item] for item in final_outputs]
......
......@@ -30,7 +30,6 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, provider:
success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
logger.info(f"configuracionn {conf}")
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
......@@ -43,7 +42,6 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, provider:
task = ti.xcom_pull(task_ids="TRANSFORMATIONS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti, lista, True)
logger.info(f"configuracionn {conf}")
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -134,7 +132,6 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
#VALIDAR NOMBRE DE LOS PROCEDURES Y LABEL STORE
with engine.connect() as connection:
for procedure in procedures:
logger.info(f"PROCEDURE INICIAL: {procedure}")
procedure = procedure[1]
if procedure.find(label_tablename + ":") != -1:
index = procedure.find(label_tablename + ":")
......@@ -145,9 +142,9 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
logger.info(f"Ejecutando creacion de procedure: {procedure}")
_ = connection.execute(procedure)
proc_created.append(tablename)
#tables_created.append(tablename)
lista.append(tablename)
if "temp_table" in element.keys() and element["temp_table"] == True:
lista.append(element["identifier"])
else:
logger.debug(f"No se encontró el label en {procedure} por ende no se creará")
save_commands_to_xcom(procedures, kwargs['ti'], procedure_mask, transform_mask, procedure_mask, order_delimiter)
......@@ -172,7 +169,6 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
final_transforms.append(transform)
transforms_per_file.append((key, final_transforms))
logger.info(f"Scripts para la transformación: {transforms_per_file}")
logger.info(f"LISTA DE ABLAS CREADAS {lista}")
task.xcom_push(key="PROC_CREATED", value=proc_created)
task.xcom_push(key="TABLES_CREATED", value=lista)
Variable.set(key='TRANSFORMS', value=transforms_per_file, serialize_json=True)
......
......@@ -97,14 +97,10 @@ def update_sql_commands_2(dataset: List[Tuple[str, str]], label_tablename: str,
extraction_mask: str) -> List[Tuple[str, List[str]]]:
result = []
comments = [CommentsScriptEnum[item].value for item in CommentsScriptEnum._member_names_ if item != CommentsScriptEnum.EXTENDED.name]
logger.info(f"COMENTS: {comments}")
try:
for row in dataset:
logger.info(f"DATASET: {dataset}")
data = row[1].split("\n")
logger.info(f"data inicial: {data}")
data = [item.replace("\r", "") for item in data if item.strip() != '']
logger.info(f"data final: {data}")
final_data = []
start_sentence = True
add_next = False
......@@ -112,7 +108,6 @@ def update_sql_commands_2(dataset: List[Tuple[str, str]], label_tablename: str,
tablename = ""
extend_comment = False
for item in data:
logger.info(f"Item in data: {item}")
if not extend_comment and item.strip().startswith(CommentsScriptEnum.EXTENDED.value):
extend_comment = True
continue
......
......@@ -14,14 +14,11 @@ def save_commands_to_xcom(dataset: List[Tuple[str, List[str]]], task, extract_ma
final_names_xcom = []
try:
for data in dataset:
logger.info(f"DATASET: {dataset}")
logger.info(f"Guardando Xcom en llave {data[0]}")
name = data[0]
base_name = name
if order_delimiter == ".":
base_name = base_name[:base_name.rfind(".")]
logger.info(f"BASE_NAME: {base_name}")
logger.info(f"ORDER DELIMITER: {order_delimiter}")
order = base_name.split(order_delimiter)
if len(order) < 2:
raise AssertionError(f"Script {name} no tiene prefijo de orden. Validar nombre de script")
......
......@@ -211,7 +211,6 @@ def set_dag():
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que informa del último proceso ejecutado",
schedule_interval=conf["inform_dag_schedule"], tags=["DAG BCOM - INFORM PROCESS"], catchup=False) as dag:
......
......@@ -6,7 +6,7 @@ import json
from airflow.operators.python import PythonOperator
from components.Databases.Database import Database
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.DatabaseOperation.DatabaseTransformation import delete_table, delete_procedure
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.S3Route import upload_file
from components.Control import extract_last_control
......@@ -14,8 +14,6 @@ from components.Utils import delete_temp_dirs
import logging
from dags.dag_transformacion_bcom import save_procedure_json
logger = logging.getLogger()
......@@ -64,24 +62,35 @@ def reset_process(intern_db, output_tmp_dir: str, procedure_filepath, **kwargs)
try:
# Borrrando tablas
task = kwargs['ti']
control_config = task.xcom_pull(task_ids="CONTROL-EXTRACTOR", key="CONTROL-CONFIG")
for index in range(len(control_config), 0, -1):
if "create_tables" in control_config[index].keys() and len(control_config[index]["create_tables"]) !=0 and "reset_by_user" not in control_config[index].keys():
final_ex = control_config[index]
break
tablenames = final_ex["create_tables"]
if len(tablenames) == 0:
logger.info("No se encontraron tablas para su limpieza")
else:
for tablename in tablenames:
tablename = tablename[0]
delete = delete_table(tablename, intern_db.engine)
if delete:
logger.info(f"Borrado correctamente la tabla {tablename}")
with open(procedure_filepath) as f:
data = json.load(f)
if data:
control_config = task.xcom_pull(task_ids="CONTROL-EXTRACTOR", key="CONTROL-CONFIG")
logger.info(f"control config {control_config}")
final_ex = control_config[-1][-1]
# for index in range(len(control_config), 0, -1):
# logger.info(f"index: {index}")
# logger.info(f"index: {control_config[index]}")
# if "create_tables" in control_config[index].keys() and len(control_config[index]["create_tables"]) !=0 and "reset_by_user" not in control_config[index].keys():
# final_ex = control_config[index]
# break
tablenames = final_ex["created_tables"]
if len(tablenames) == 0:
logger.info("No se encontraron tablas para su limpieza")
else:
for tablename in tablenames:
for element in data:
if tablename == element["identifier"]:
if "transformation_store_procedure" in element.keys():
delete = delete_procedure(tablename, intern_db.engine)
else:
delete = delete_table(tablename, intern_db.engine)
if delete:
logger.info(f"Borrado correctamente la tabla {tablename}")
# Borrando archivos temporales que hayan quedado sueltos
delete = delete_temp_dirs(output_tmp_dir)
if delete:
logger.info("Se borraron todos los archivos temporales")
# delete = delete_temp_dirs(output_tmp_dir)
# if delete:
# logger.info("Se borraron todos los archivos temporales")
except Exception as e:
logger.error(f"Error procesando archivo de control. {e}")
......@@ -97,12 +106,13 @@ def set_dag():
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que resetea el último proceso ejecutado",
schedule_interval=conf["reset_dag_schedule"], tags=["DAG BCOM - RESET PROCESS"], catchup=False) as dag:
control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"]
# NUEVA VARIABLE
procedure = conf["procedure"]
control_extractor = PythonOperator(
task_id="CONTROL-EXTRACTOR",
python_callable=extract_last_control,
......@@ -121,7 +131,7 @@ def set_dag():
control_process = PythonOperator(
task_id="RESET-PROCESS",
python_callable=reset_process,
op_kwargs={'intern_db': intern_db, 'output_tmp_dir': tmp_dir},
op_kwargs={'intern_db': intern_db, 'output_tmp_dir': tmp_dir, 'procedure_filepath': procedure["filepath"]},
trigger_rule="all_success"
)
......
......@@ -85,7 +85,6 @@ def save_procedure_json(json_path: str, task) -> None:
try:
with open(json_path) as f:
data = json.load(f)
logger.info(f"JSON-PROCEDURE {data}")
if data:
task.xcom_push(key="EXTRACTION-DEFINITION-JSON", value=data)
except Exception as e:
......@@ -103,8 +102,6 @@ def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str,
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data)
data = json.load(data)
logger.info(f"Data {data}")
logger.info(f"TIPO DATA {type(data)}")
except Exception:
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}]
logger.info(f"Json de control creado: {data}")
......@@ -133,7 +130,6 @@ def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, tr
def set_dag():
""" DAG that execute a series of SQL commands from scripts to make a result file and save on a bucket (for now)"""
import yaml
from yaml.loader import SafeLoader
......
......@@ -3,6 +3,10 @@
"identifier": "obtenerEstudiantes",
"transformation_store_procedure": true
},
{
"identifier": "TempEstudiantes",
"temp_table" : true
},
{
"identifier": "ESTUDIANTES_11",
"fields": [
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment