Commit 2a3813cb authored by Erly Villaroel's avatar Erly Villaroel

Control sobre las tablas y objetos creados

parent 94bc9443
...@@ -7,7 +7,7 @@ from airflow.decorators import task ...@@ -7,7 +7,7 @@ from airflow.decorators import task
from components.Utils import select_multiple from components.Utils import select_multiple
from components.Xcom import delete_all_xcom_tasks, delete_task_instances from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.DatabaseOperation.DatabaseTransformation import delete_table from components.DatabaseOperation.DatabaseTransformation import delete_table, delete_procedure
from components.S3Route import load_control_to_s3 from components.S3Route import load_control_to_s3
from enums.OperationTypeEnum import OperationTypeEnum from enums.OperationTypeEnum import OperationTypeEnum
...@@ -16,9 +16,16 @@ import logging ...@@ -16,9 +16,16 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str, **kwargs) -> None: def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str, engine, **kwargs) -> None:
delete_task_instances() delete_task_instances()
ti = kwargs["ti"] ti = kwargs["ti"]
procedures = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="PROC_CREATED")
if procedures:
for procedure in procedures:
logger.info(f"Borrando procedures {procedure}")
delete = delete_procedure(procedure, engine)
if delete:
logger.info(f"Borrado correctamente el procedure {procedure}")
conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG") conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG")
conn = control_params["connection_id"] conn = control_params["connection_id"]
bucket = control_params["bucket"] bucket = control_params["bucket"]
...@@ -80,7 +87,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider ...@@ -80,7 +87,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider
validate_task = PythonOperator( validate_task = PythonOperator(
task_id="VALIDATE_CLEANER", task_id="VALIDATE_CLEANER",
python_callable=validate_clean, python_callable=validate_clean,
op_kwargs={'control_params': control_s3, 'provider': provider, 'timezone': timezone}, op_kwargs={'control_params': control_s3, 'provider': provider, 'timezone': timezone, 'engine':db_intern_conn.engine},
trigger_rule='none_skipped' trigger_rule='none_skipped'
) )
cleaners >> tasks >> validate_task cleaners >> tasks >> validate_task
......
...@@ -38,21 +38,31 @@ def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[s ...@@ -38,21 +38,31 @@ def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[s
def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str, Any], def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str, Any],
timezone: str, task, delete_last: bool = False, frequency: str = "montly") -> List[Dict[str, Any]]: timezone: str, task, created_tables: List, delete_last: bool = False, frequency: str = "montly") -> List[Dict[str, Any]]:
try: try:
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W" format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
current_period = str(datetime_by_tzone(timezone, format_date))[:7] current_period = str(datetime_by_tzone(timezone, format_date))[:7]
processed_period = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="PROCESSED_PERIOD") processed_period = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="PROCESSED_PERIOD")
logger.info(f"ARCHIVO DE CONFIGURACION : {conf}")
if delete_last: if delete_last:
last_tasks = conf[-1]["tasks"] last_tasks = conf[-1]["tasks"]
tasks.update(last_tasks) tasks.update(last_tasks)
conf.pop(-1) conf.pop(-1)
current_datetime = str(datetime_by_tzone(timezone, '%Y-%m-%d %H:%M:%S')) current_datetime = str(datetime_by_tzone(timezone, '%Y-%m-%d %H:%M:%S'))
new_process = {"date": current_datetime, "status": status, "tasks": tasks} new_process = {"date": current_datetime, "status": status, "tasks": tasks, "created_tables": created_tables}
if current_period == processed_period: if conf is None:
conf.append(new_process) conf = []
if current_period == processed_period:
conf.append(new_process)
else:
conf = [new_process]
else: else:
conf = [new_process] conf = [conf]
if current_period == processed_period:
conf.append(new_process)
else:
conf = [new_process]
logger.info(f"NUEVO PROCESO {conf}")
except Exception as e: except Exception as e:
logger.error(f"Error actualizando archivo de control. {e}") logger.error(f"Error actualizando archivo de control. {e}")
finally: finally:
......
...@@ -31,3 +31,21 @@ def delete_table(tablename: str, engine) -> bool: ...@@ -31,3 +31,21 @@ def delete_table(tablename: str, engine) -> bool:
logger.error(f"Error borrando tabla {tablename}. {e}") logger.error(f"Error borrando tabla {tablename}. {e}")
finally: finally:
return delete return delete
def delete_procedure(procedure: str, engine) -> bool:
delete = False
try:
command = f"DROP PROCEDURE IF EXISTS {procedure}"
start_time = time.time()
with engine.connect() as conn:
try:
_ = conn.execute(command)
except Exception as e:
logger.error(f"Procedure no encontrado. {e}")
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
except Exception as e:
logger.error(f"Error borrando procedure {procedure}. {e}")
finally:
return delete
...@@ -32,7 +32,10 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider: ...@@ -32,7 +32,10 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider:
success_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="SUCCESS_TASKS") success_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="FAILED_TASKS") failed_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="CONTROL-CONFIG") conf = ti.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="CONTROL-CONFIG")
print("CONF", conf) created_tables = ti.xcom_pull(task_ids="EXTRACTORS", key="TABLES_CREATED")
lista = []
for i in created_tables:
lista.append(i)
final_dict = {} final_dict = {}
status = ProcessStatusEnum.SUCCESS.value status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0: if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
...@@ -44,7 +47,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider: ...@@ -44,7 +47,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider:
for failed_task in failed_tasks: for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="EXTRACTORS", key=failed_task)[0] task = ti.xcom_pull(task_ids="EXTRACTORS", key=failed_task)[0]
final_dict.update({failed_task: task}) final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti) conf = update_new_process(conf, status, final_dict, timezone, ti, lista)
if status == ProcessStatusEnum.FAIL.value: if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"] conn = control_params["connection_id"]
bucket = control_params["bucket"] bucket = control_params["bucket"]
...@@ -97,8 +100,10 @@ def on_success_extractor(context) -> None: ...@@ -97,8 +100,10 @@ def on_success_extractor(context) -> None:
def extract_from_source(command, source_conn, intern_conn, chunksize: int, timezone: str, **kwargs): def extract_from_source(command, source_conn, intern_conn, chunksize: int, timezone: str, **kwargs):
if isinstance(command, type(None)): if isinstance(command, type(None)):
raise AirflowSkipException raise AirflowSkipException
extract_tables = []
task = kwargs['ti'] task = kwargs['ti']
init_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S') init_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="INIT_PROCESS_DATETIME_" + str(task.map_index), value=init_process_datetime) task.xcom_push(key="INIT_PROCESS_DATETIME_" + str(task.map_index), value=init_process_datetime)
...@@ -132,6 +137,9 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez ...@@ -132,6 +137,9 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
create = intern_conn.create_table(model) create = intern_conn.create_table(model)
if create: if create:
logger.info(f"Creado correctamente la tabla {tablename}. Creado?: {create}") logger.info(f"Creado correctamente la tabla {tablename}. Creado?: {create}")
extract_tables.append(tablename)
logger.info(f"TABLASCREADAS: {extract_tables}")
task.xcom_push(key="TABLES_CREATED", value=tablename)
else: else:
raise AssertionError(f"Error creando tabla {tablename}") raise AssertionError(f"Error creando tabla {tablename}")
if is_procedure: if is_procedure:
...@@ -202,6 +210,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez ...@@ -202,6 +210,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos") logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos")
end_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S') end_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="END_PROCESS_DATETIME_" + str(task.map_index), value=end_process_datetime) task.xcom_push(key="END_PROCESS_DATETIME_" + str(task.map_index), value=end_process_datetime)
except Exception as e: except Exception as e:
delete = delete_table(tablename, intern_conn.engine) delete = delete_table(tablename, intern_conn.engine)
if delete: if delete:
......
...@@ -27,6 +27,7 @@ logger = logging.getLogger() ...@@ -27,6 +27,7 @@ logger = logging.getLogger()
def validate_generate(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None: def validate_generate(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None:
delete_task_instances() delete_task_instances()
ti = kwargs["ti"] ti = kwargs["ti"]
created_tables = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="TABLES_CREATED")
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS") success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="GENERATORS", key="FAILED_TASKS") failed_tasks = ti.xcom_pull(task_ids="GENERATORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_TRANSFORMATION", key="CONTROL-CONFIG") conf = ti.xcom_pull(task_ids="VALIDATE_TRANSFORMATION", key="CONTROL-CONFIG")
...@@ -41,7 +42,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, provider: s ...@@ -41,7 +42,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, provider: s
for failed_task in failed_tasks: for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0] task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0]
final_dict.update({failed_task: task}) final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti, True) conf = update_new_process(conf, status, final_dict, timezone, ti, created_tables, True)
if status == ProcessStatusEnum.FAIL.value: if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"] conn = control_params["connection_id"]
bucket = control_params["bucket"] bucket = control_params["bucket"]
...@@ -136,7 +137,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez ...@@ -136,7 +137,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
for campo in dataframe.columns: for campo in dataframe.columns:
if campo in campos.keys(): if campo in campos.keys():
if campos[campo] == DataTypeEnum.DATE.name: if campos[campo] == DataTypeEnum.DATE.name:
dataframe[campo] = dataframe[campo].dt.date dataframe[campo] = pd.to_datetime(dataframe[campo]).dt.date
# elif campos[campo] == DataTypeEnum.DATETIME.name: # datetime: # elif campos[campo] == DataTypeEnum.DATETIME.name: # datetime:
# dataframe[campo] = pd.to_datetime(dataframe[campo], format='%Y-%m-%d %H:%M:%S') # dataframe[campo] = pd.to_datetime(dataframe[campo], format='%Y-%m-%d %H:%M:%S')
dataframe = dataframe.drop("INTERN_ID_BCOM", axis=1, errors='ignore') dataframe = dataframe.drop("INTERN_ID_BCOM", axis=1, errors='ignore')
......
...@@ -23,9 +23,14 @@ logger = logging.getLogger() ...@@ -23,9 +23,14 @@ logger = logging.getLogger()
def validate_transform(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None: def validate_transform(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None:
delete_task_instances() delete_task_instances()
ti = kwargs["ti"] ti = kwargs["ti"]
created_tables = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="TABLES_CREATED")
lista = []
for i in created_tables:
lista.append(i)
success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS") success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="FAILED_TASKS") failed_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG") conf = ti.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
logger.info(f"configuracionn {conf}")
final_dict = {} final_dict = {}
status = ProcessStatusEnum.SUCCESS.value status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0: if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
...@@ -37,7 +42,8 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, provider: ...@@ -37,7 +42,8 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, provider:
for failed_task in failed_tasks: for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="TRANSFORMATIONS", key=failed_task)[0] task = ti.xcom_pull(task_ids="TRANSFORMATIONS", key=failed_task)[0]
final_dict.update({failed_task: task}) final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti, True) conf = update_new_process(conf, status, final_dict, timezone, ti, lista, True)
logger.info(f"configuracionn {conf}")
if status == ProcessStatusEnum.FAIL.value: if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"] conn = control_params["connection_id"]
bucket = control_params["bucket"] bucket = control_params["bucket"]
...@@ -94,7 +100,7 @@ def transformations(xcom_commands: str, intern_conn, timezone: str, **kwargs): ...@@ -94,7 +100,7 @@ def transformations(xcom_commands: str, intern_conn, timezone: str, **kwargs):
commands = xcom_commands[1] commands = xcom_commands[1]
logger.info(f"Ejecutando transformaciones del script {script_name}") logger.info(f"Ejecutando transformaciones del script {script_name}")
not_procedure = ["UPDATE","SELECT","CREATE","ALTER","DROP","DELETE","INSERT","GRANT","REVOKE","TRUNCATE","COPY", not_procedure = ["UPDATE","SELECT","CREATE","ALTER","DROP","DELETE","INSERT","GRANT","REVOKE","TRUNCATE","COPY",
"COMMIT","ROLLBACK","USE"] "COMMIT","ROLLBACK","USE","BEGIN"]
with engine.connect() as connection: with engine.connect() as connection:
for command in commands: for command in commands:
if any(command.startswith(palabra) or command.startswith(palabra.lower()) for palabra in not_procedure): if any(command.startswith(palabra) or command.startswith(palabra.lower()) for palabra in not_procedure):
...@@ -114,16 +120,36 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena ...@@ -114,16 +120,36 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
db_intern_conn, **kwargs): db_intern_conn, **kwargs):
task = kwargs['ti'] task = kwargs['ti']
#CAMBIOS PARA TRAER LOS SP #CAMBIOS PARA TRAER LOS SP
proc_created = []
tables_created = task.xcom_pull(task_ids="EXTRACTORS", key="TABLES_CREATED")
lista = []
for i in tables_created:
lista.append(i)
engine = db_intern_conn.engine engine = db_intern_conn.engine
conn_id = store_procedure["s3_params"]["connection_id"] conn_id = store_procedure["s3_params"]["connection_id"]
bucket = store_procedure["s3_params"]["bucket"] bucket = store_procedure["s3_params"]["bucket"]
prefix = store_procedure["s3_params"]["prefix"] prefix = store_procedure["s3_params"]["prefix"]
procedures = get_files_from_prefix(conn_id, bucket, prefix, provider) procedures = get_files_from_prefix(conn_id, bucket, prefix, provider)
definitions = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="EXTRACTION-DEFINITION-JSON")
#VALIDAR NOMBRE DE LOS PROCEDURES Y LABEL STORE
with engine.connect() as connection: with engine.connect() as connection:
for procedure in procedures: for procedure in procedures:
logger.info(f"PROCEDURE INICIAL: {procedure}")
procedure = procedure[1] procedure = procedure[1]
logger.info(f"Ejecutando creacion de procedure: {procedure}") if procedure.find(label_tablename + ":") != -1:
_ = connection.execute(procedure) index = procedure.find(label_tablename + ":")
label_lenght = len(label_tablename + ":")
tablename = procedure[index + label_lenght:].strip().split("\n")[0]
for element in definitions:
if element["identifier"] == tablename and "transformation_store_procedure" in element.keys() and element["transformation_store_procedure"] == True:
logger.info(f"Ejecutando creacion de procedure: {procedure}")
_ = connection.execute(procedure)
proc_created.append(tablename)
#tables_created.append(tablename)
lista.append(tablename)
else:
logger.debug(f"No se encontró el label en {procedure} por ende no se creará")
save_commands_to_xcom(procedures, kwargs['ti'], procedure_mask, transform_mask, procedure_mask, order_delimiter) save_commands_to_xcom(procedures, kwargs['ti'], procedure_mask, transform_mask, procedure_mask, order_delimiter)
logger.debug(f"Procedures cargados en Xcom: {procedures}") logger.debug(f"Procedures cargados en Xcom: {procedures}")
...@@ -146,6 +172,9 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena ...@@ -146,6 +172,9 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
final_transforms.append(transform) final_transforms.append(transform)
transforms_per_file.append((key, final_transforms)) transforms_per_file.append((key, final_transforms))
logger.info(f"Scripts para la transformación: {transforms_per_file}") logger.info(f"Scripts para la transformación: {transforms_per_file}")
logger.info(f"LISTA DE ABLAS CREADAS {lista}")
task.xcom_push(key="PROC_CREATED", value=proc_created)
task.xcom_push(key="TABLES_CREATED", value=lista)
Variable.set(key='TRANSFORMS', value=transforms_per_file, serialize_json=True) Variable.set(key='TRANSFORMS', value=transforms_per_file, serialize_json=True)
if len(transforms_per_file) > 0: if len(transforms_per_file) > 0:
return [[item] for item in transforms_per_file] return [[item] for item in transforms_per_file]
......
...@@ -14,6 +14,8 @@ from components.Utils import delete_temp_dirs ...@@ -14,6 +14,8 @@ from components.Utils import delete_temp_dirs
import logging import logging
from dags.dag_transformacion_bcom import save_procedure_json
logger = logging.getLogger() logger = logging.getLogger()
...@@ -58,10 +60,16 @@ def update_control(control_params: Dict[str, Any], provider: str, **kwargs) -> N ...@@ -58,10 +60,16 @@ def update_control(control_params: Dict[str, Any], provider: str, **kwargs) -> N
logger.error(f"Error actualizando archivo de control. {e}") logger.error(f"Error actualizando archivo de control. {e}")
def reset_process(intern_db, output_tmp_dir: str) -> None: def reset_process(intern_db, output_tmp_dir: str, procedure_filepath, **kwargs) -> None:
try: try:
# Borrrando tablas # Borrrando tablas
tablenames = intern_db.get_all_tablenames() task = kwargs['ti']
control_config = task.xcom_pull(task_ids="CONTROL-EXTRACTOR", key="CONTROL-CONFIG")
for index in range(len(control_config), 0, -1):
if "create_tables" in control_config[index].keys() and len(control_config[index]["create_tables"]) !=0 and "reset_by_user" not in control_config[index].keys():
final_ex = control_config[index]
break
tablenames = final_ex["create_tables"]
if len(tablenames) == 0: if len(tablenames) == 0:
logger.info("No se encontraron tablas para su limpieza") logger.info("No se encontraron tablas para su limpieza")
else: else:
......
...@@ -103,6 +103,8 @@ def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str, ...@@ -103,6 +103,8 @@ def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str,
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore') str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data) data = StringIO(str_data)
data = json.load(data) data = json.load(data)
logger.info(f"Data {data}")
logger.info(f"TIPO DATA {type(data)}")
except Exception: except Exception:
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}] data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}]
logger.info(f"Json de control creado: {data}") logger.info(f"Json de control creado: {data}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment