Commit e8e7bfd7 authored by Erly Villaroel's avatar Erly Villaroel

Cambios Verificacion de tablas antes de la creacion

parent dc07bb88
......@@ -4,21 +4,24 @@ from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.Utils import select_multiple
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.DatabaseOperation.DatabaseTransformation import delete_table, delete_procedure
from components.S3Route import load_control_to_s3
from enums.OperationTypeEnum import OperationTypeEnum
from components.Control import get_tasks_from_control, update_new_process
import logging
logger = logging.getLogger()
def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str, engine, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
#created_Tables = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CREATED_TABLES")
procedures = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="PROC_CREATED")
if procedures:
for procedure in procedures:
......@@ -26,6 +29,8 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
delete = delete_procedure(procedure, engine)
if delete:
logger.info(f"Borrado correctamente el procedure {procedure}")
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="GENERATORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG")
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -33,6 +38,19 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
if not prefix.endswith("/"):
prefix += "/"
key = prefix + control_params["filename"]
#conf = json.dumps(conf, indent=2, default=str)
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
for success_task in success_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=success_task)[0]
final_dict.update({success_task: task})
if not isinstance(failed_tasks, type(None)) and len(failed_tasks) > 0:
status = ProcessStatusEnum.FAIL.value
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti, [], True)
conf = json.dumps(conf, indent=2, default=str)
loaded = load_control_to_s3(bytes(conf.encode()), conn, bucket, key, provider, timezone)
if loaded:
......
......@@ -8,7 +8,7 @@ logger = logging.getLogger()
def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
save = True
save = False
try:
chunksize = 2000
# db_type = connection.db_type
......@@ -30,6 +30,7 @@ def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
# df.to_sql(tablename, conn, if_exists='append', dtype=dtyp, index=False, chunksize=chunksize)
# else:
df.to_sql(tablename, conn, if_exists='append', index=False, chunksize=chunksize)
save = True
except Exception as e:
logger.error(f"Error guardando resultados desde dataframe. {e}")
raise AssertionError(f"Error guardando resultados desde dataframe. {e}")
......
......@@ -94,3 +94,30 @@ class Database:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
def check_procedure(self, procedure_name, connection) -> bool:
result = False
try:
result = self.factory.check_procedure(procedure_name, connection)
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return result
def check_table(self, table_name, connection) -> bool:
result = False
try:
result = self.factory.check_table(table_name, connection)
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return result
def verify_table(self, table_name, connection) -> bool:
result = False
try:
result = self.factory.verify_table(table_name, connection)
except Exception as e:
logger.error(f"Error obteniendo numero de registros de la tabla. {e}")
finally:
return result
\ No newline at end of file
......@@ -102,3 +102,37 @@ class Mysql:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
def check_procedure(self, procedure_name, connection) -> bool:
exists = False
try:
check_query = f"SHOW CREATE PROCEDURE {procedure_name}"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de procedure. {e}")
finally:
return exists
def check_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SHOW CREATE TABLE {table_name}"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de tabla. {e}")
finally:
return exists
def verify_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT COUNT(*) FROM {table_name}"
result = connection.execute(check_query)
if result > 0:
exists = True
except Exception as e:
logger.error(f"Error obteniendo counts de tabla. {e}")
finally:
return exists
\ No newline at end of file
......@@ -109,3 +109,37 @@ class Oracle:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
def check_procedure(self, procedure_name, connection) -> bool:
exists = False
try:
check_query = f"SHOW CREATE PROCEDURE {procedure_name}"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de procedure. {e}")
finally:
return exists
def check_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SHOW CREATE TABLE {table_name}"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de tabla. {e}")
finally:
return exists
def verify_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT COUNT(*) FROM {table_name}"
result = connection.execute(check_query)
if result > 0:
exists = True
except Exception as e:
logger.error(f"Error obteniendo counts de tabla. {e}")
finally:
return exists
\ No newline at end of file
......@@ -103,3 +103,37 @@ class Postgres:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
def check_procedure(self, procedure_name, connection) -> bool:
exists = False
try:
check_query = f"SHOW CREATE PROCEDURE {procedure_name}"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de procedure. {e}")
finally:
return exists
def check_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SHOW CREATE TABLE {table_name}"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de tabla. {e}")
finally:
return exists
def verify_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT COUNT(*) FROM {table_name}"
result = connection.execute(check_query)
if result > 0:
exists = True
except Exception as e:
logger.error(f"Error obteniendo counts de tabla. {e}")
finally:
return exists
\ No newline at end of file
......@@ -33,9 +33,14 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider:
failed_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="CONTROL-CONFIG")
created_tables = ti.xcom_pull(task_ids="EXTRACTORS", key="TABLES_CREATED")
created_tables_master = ti.xcom_pull(task_ids="MASTER_EXTRACTOR", key="TABLES_CREATED")
lista = []
if created_tables:
for i in created_tables:
lista.append(i)
if created_tables_master:
for i in created_tables_master:
lista.append(i)
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
......@@ -48,6 +53,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider:
task = ti.xcom_pull(task_ids="EXTRACTORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti, lista)
ti.xcom_push(key="TABLES_CREATED", value=lista)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -63,6 +69,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider:
raise AirflowSkipException(f"Ocurrieron errores en la etapa de extracción")
elif status == ProcessStatusEnum.SUCCESS.value:
ti.xcom_push(key="CONTROL-CONFIG", value=conf)
logger.info(f"tablas creadas : {lista}")
def on_failure_extractor(context) -> None:
......@@ -101,13 +108,15 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
raise AirflowSkipException
extract_tables = []
task = kwargs['ti']
created_tables = task.xcom_pull(task_ids="MASTER_EXTRACTOR", key="TABLES_CREATED")
for i in created_tables:
extract_tables.append(i)
init_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="INIT_PROCESS_DATETIME_" + str(task.map_index), value=init_process_datetime)
extract_type = command[0].split("|")[0]
command = command[1]
source_engine = source_conn.engine
definitions = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="EXTRACTION-DEFINITION-JSON")
logger.info(f"Definiciones desde json: {definitions}")
multiple = select_multiple(command)
tablename = multiple["tablename"]
is_procedure = False
......@@ -130,18 +139,27 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
if isinstance(model, type(None)):
raise AssertionError(f"Definición del extracción para {tablename} en el json-descriptor no encontraddo")
try:
# connection = intern_conn.engine.connect()
# result = bool(intern_conn.check_table(tablename, connection))
# logger.info(f"resultado {result}")
# logger.info(f"resultado {type(result)}")
connection = intern_conn.engine.connect()
result = bool(intern_conn.check_table(tablename, connection))
resultado = intern_conn.verify_table(tablename, connection)
logger.info(f"resultado {result}")
logger.info(f"resultado2 {resultado}")
if not result or not resultado:
create = intern_conn.create_table(model)
if create:
logger.info(f"Creado correctamente la tabla {tablename}. Creado?: {create}")
extract_tables.append(tablename)
task.xcom_push(key="TABLES_CREATED", value=tablename)
#extract_tables.append(tablename)
#task.xcom_push(key="TABLES_CREATED", value=tablename)
else:
raise AssertionError(f"Error creando tabla {tablename}")
if is_procedure:
command = command[len(tablename+"|"):]
temp_connection = source_conn.get_basic_connection()
command = source_conn.generate_sql_procedure(command)
logger.debug(f"FINAL COMMAND: {command}")
if source_conn.db_type == DatabaseTypeEnum.ORACLE.value:
cursor = temp_connection.cursor()
cursor.execute(command)
......@@ -160,6 +178,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
if save:
logger.debug(f"Migrado correctamente todos los datos")
extract_tables.append(tablename)
data.clear()
elif source_conn.db_type == DatabaseTypeEnum.MYSQL.value or \
source_conn.db_type == DatabaseTypeEnum.POSTGRES.value:
......@@ -179,6 +198,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
if save:
logger.debug(f"Migrado correctamente todos los datos")
data.clear()
logger.info("Guardado correctamente todos los datos")
source_conn.close_basic_connection()
......@@ -201,11 +221,15 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
dataframe = dataframe.fillna(value=np.nan)
save = save_from_dataframe(dataframe, tablename, intern_conn)
if save:
if dataframe.empty:
raise ValueError(f"La tabla {tablename} está vacia")
logger.info(f"Guardado correctamente dataframe en el paso {step+1}")
else:
raise AssertionError("Tabla no creada correctamente")
logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos")
end_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="TABLES_CREATED", value=tablename)
task.xcom_push(key="END_PROCESS_DATETIME_" + str(task.map_index), value=end_process_datetime)
except Exception as e:
delete = delete_table(tablename, intern_conn.engine)
if delete:
......@@ -214,14 +238,28 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
@task(task_id="MASTER_EXTRACTOR", trigger_rule='all_success')
def get_select_from_xcom(**kwargs):
def get_select_from_xcom(db_intern_conn, **kwargs):
task = kwargs['ti']
final_selects = []
lista = []
conf = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="CONTROL-CONFIG")
temp_conf = conf[-1]["objects_created"]
engine = db_intern_conn.engine
logger.info(f"OBJETOS CREADOS: {temp_conf}")
if temp_conf:
tablas_temp = [i for i in temp_conf if i.startswith("Temp")]
logger.info(f"tablas temporales: {tablas_temp}")
if tablas_temp:
for i in tablas_temp:
delete = delete_table(i, engine)
if delete:
logger.info(f"Borrado correctamente la tabla temporal {i}")
tasks = get_tasks_from_control(conf, "extractor")
success_tasks = tasks["tasks"]
success_tasks = [item[1] for item in success_tasks]
logger.info(f"COMANDOS QUE FUERON EXITOSOS: {success_tasks}")
for i in success_tasks:
lista.append(i)
xcom_keys = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="XCOM-EXTRACTION-NAMES")
logger.debug(xcom_keys)
for key in xcom_keys:
......@@ -235,6 +273,8 @@ def get_select_from_xcom(**kwargs):
final_selects.append((key, select, ))
logger.info(f"Comandos para la extracción: {final_selects}")
Variable.set(key='SELECTS', value=final_selects, serialize_json=True)
logger.info(f"TABLAS CREADAS MASTER EXT: {lista}")
task.xcom_push(key="TABLES_CREATED", value=lista)
if len(final_selects) > 0:
return [[item] for item in final_selects]
else:
......@@ -246,7 +286,7 @@ def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timez
group = None
try:
with TaskGroup(group_id="ExtraccionDeDatos", prefix_group_id=False) as group:
selects = get_select_from_xcom()
selects = get_select_from_xcom(db_intern_conn)
validate_task = PythonOperator(
task_id="VALIDATE_EXTRACTION",
......
......@@ -29,6 +29,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, provider: s
ti = kwargs["ti"]
created_tables = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="TABLES_CREATED")
lista = []
if created_tables:
for i in created_tables:
lista.append(i)
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
......@@ -46,6 +47,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, provider: s
task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti, lista, True)
ti.xcom_push(key="CREATED_TABLES", value=lista)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -159,7 +161,8 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
file_key = prefix + tmp_file[tmp_file.rfind("/")+1:]
# Se sube el archivo al S3
logger.info(f"Tamaño del archivo a subir: {size} bytes")
save_df_to_s3(tmp_file, conn_id, bucket, file_key, provider, in_memory=False)
if not save_df_to_s3(tmp_file, conn_id, bucket, file_key, provider, in_memory=False):
raise AssertionError(f"No se guardó el archivo en la ruta indicada")
# Se borra el archivo al finalizar el upload
delete_temp_dir(tmp_file)
break
......
......@@ -95,6 +95,7 @@ def get_base_date(conn: str, bucket: str, key: str) -> datetime.date:
def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, provider: str, delimiter: str = ",",
in_memory: bool = True):
result = False
try:
logger.info(f"SUBIENDO A NUBE KEY {key}")
file_type = get_type_file(key)
......@@ -129,9 +130,10 @@ def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, p
hook.upload(bucket, key, data)
else:
hook.load_file(data, key, bucket)
result = True
except Exception as e:
logger.error(f"Error guardando archivos a S3. key: {key}. {e}")
return result
def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str) -> None:
try:
......
......@@ -25,6 +25,7 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, provider:
ti = kwargs["ti"]
created_tables = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="TABLES_CREATED")
lista = []
if created_tables:
for i in created_tables:
lista.append(i)
success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS")
......@@ -107,7 +108,7 @@ def transformations(xcom_commands: str, intern_conn, timezone: str, **kwargs):
else:
logger.info(f"Generando llamada al procedure según bd para: {command}")
command = intern_conn.generate_sql_procedure(command)
logger.debug(f"EJECUTANDO FINAL PROCEDURE: {command}")
logger.info(f"EJECUTANDO FINAL PROCEDURE: {command}")
_ = connection.execute(command)
end_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="END_PROCESS_DATETIME_" + str(task.map_index), value=end_process_datetime)
......@@ -119,7 +120,7 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
task = kwargs['ti']
#CAMBIOS PARA TRAER LOS SP
proc_created = []
tables_created = task.xcom_pull(task_ids="EXTRACTORS", key="TABLES_CREATED")
tables_created = task.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="TABLES_CREATED")
lista = []
for i in tables_created:
lista.append(i)
......@@ -130,6 +131,9 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
procedures = get_files_from_prefix(conn_id, bucket, prefix, provider)
definitions = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="EXTRACTION-DEFINITION-JSON")
#VALIDAR NOMBRE DE LOS PROCEDURES Y LABEL STORE
for element in definitions:
if "temp_table" in element.keys() and element["temp_table"] == True:
lista.append(element["identifier"])
with engine.connect() as connection:
for procedure in procedures:
procedure = procedure[1]
......@@ -140,11 +144,11 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
for element in definitions:
if element["identifier"] == tablename and "transformation_store_procedure" in element.keys() and element["transformation_store_procedure"] == True:
logger.info(f"Ejecutando creacion de procedure: {procedure}")
result = db_intern_conn.check_procedure(tablename, connection)
if not result:
_ = connection.execute(procedure)
proc_created.append(tablename)
lista.append(tablename)
if "temp_table" in element.keys() and element["temp_table"] == True:
lista.append(element["identifier"])
else:
logger.debug(f"No se encontró el label en {procedure} por ende no se creará")
save_commands_to_xcom(procedures, kwargs['ti'], procedure_mask, transform_mask, procedure_mask, order_delimiter)
......
......@@ -59,11 +59,7 @@ app:
bucket: pruebairflow
prefix: bcom_results
connection_id: prueba_af
tabla1:
bucket: pruebairflow
prefix: bcom_results
connection_id: prueba_af
tabla2:
tabla5:
bucket: pruebairflow
prefix: bcom_results
connection_id: prueba_af
......
......@@ -74,7 +74,7 @@ def reset_process(intern_db, output_tmp_dir: str, procedure_filepath, **kwargs)
# if "create_tables" in control_config[index].keys() and len(control_config[index]["create_tables"]) !=0 and "reset_by_user" not in control_config[index].keys():
# final_ex = control_config[index]
# break
tablenames = final_ex["created_tables"]
tablenames = final_ex["objects_created"]
if len(tablenames) == 0:
logger.info("No se encontraron tablas para su limpieza")
else:
......
......@@ -106,6 +106,7 @@ def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str,
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}]
logger.info(f"Json de control creado: {data}")
task.xcom_push(key="CONTROL-CONFIG", value=data)
except Exception as e:
logger.error(f"Error general de descarga de archivo de control. {e}")
......
......@@ -97,6 +97,42 @@
},
{
"identifier": "tabla4",
"fields": [
{
"name": "id",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "nombre",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "fecha",
"datatype": "DATE"
},
{
"name": "fecha_2",
"datatype": "DATE"
},
{
"name": "fecha_3",
"datatype": "DATETIME"
}
],
"indexes": [
{
"name" : "indice1",
"index_fields": [
"id"
]
}
],
"save_output" : true
},
{
"identifier": "tabla5",
"fields": [
{
"name": "id",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment