Commit e0d3c20c authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer_ev' into 'developer'

Merge Developer EV

See merge request !13
parents 2c9764fa 12499eed
......@@ -4,21 +4,33 @@ from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from enums.ProcessStatusEnum import ProcessStatusEnum
from components.Utils import select_multiple
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.DatabaseOperation.DatabaseTransformation import delete_table, delete_procedure
from components.S3Route import load_control_to_s3
from enums.OperationTypeEnum import OperationTypeEnum
from components.Control import get_tasks_from_control, update_new_process
import logging
logger = logging.getLogger()
def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str, **kwargs) -> None:
def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str, engine, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
#created_Tables = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CREATED_TABLES")
procedures = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="PROC_CREATED")
if procedures:
for procedure in procedures:
logger.info(f"Borrando procedures {procedure}")
delete = delete_procedure(procedure, engine)
if delete:
logger.info(f"Borrado correctamente el procedure {procedure}")
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="GENERATORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG")
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -26,6 +38,19 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
if not prefix.endswith("/"):
prefix += "/"
key = prefix + control_params["filename"]
#conf = json.dumps(conf, indent=2, default=str)
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
for success_task in success_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=success_task)[0]
final_dict.update({success_task: task})
if not isinstance(failed_tasks, type(None)) and len(failed_tasks) > 0:
status = ProcessStatusEnum.FAIL.value
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti, [], True)
conf = json.dumps(conf, indent=2, default=str)
loaded = load_control_to_s3(bytes(conf.encode()), conn, bucket, key, provider, timezone)
if loaded:
......@@ -35,7 +60,8 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
def clean(command: str, intern_conn):
engine = intern_conn.engine
tablename = select_multiple(command)["tablename"]
#tablename = select_multiple(command)["tablename"]
tablename = command
logger.info(f"Borrando tabla {tablename}")
delete = delete_table(tablename, engine)
if delete:
......@@ -58,8 +84,12 @@ def get_cleaners_from_xcom(**kwargs):
final_selects.append(select)
logger.info(f"Final selects: {final_selects}")
Variable.set(key='CLEANS', value=final_selects, serialize_json=True)
if len(final_selects) > 0:
return [[item] for item in final_selects]
created_tables = task.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="TABLES_CREATED")
lista = []
for i in created_tables:
lista.append(i)
if len(lista) > 0:
return [[item] for item in lista]
else:
return [[None]]
......@@ -80,7 +110,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider
validate_task = PythonOperator(
task_id="VALIDATE_CLEANER",
python_callable=validate_clean,
op_kwargs={'control_params': control_s3, 'provider': provider, 'timezone': timezone},
op_kwargs={'control_params': control_s3, 'provider': provider, 'timezone': timezone, 'engine':db_intern_conn.engine},
trigger_rule='none_skipped'
)
cleaners >> tasks >> validate_task
......
......@@ -13,7 +13,6 @@ def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[s
response = {'status': ProcessStatusEnum.SUCCESS.value, 'tasks': [], 'reset': False}
try:
conf = conf[-1]
logger.info(f"Último proceso ejecutado: {conf}")
status = conf["status"]
if "reset_by_user" in conf.keys():
response["reset"] = True
......@@ -38,7 +37,7 @@ def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[s
def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str, Any],
timezone: str, task, delete_last: bool = False, frequency: str = "montly") -> List[Dict[str, Any]]:
timezone: str, task, created_tables: List, delete_last: bool = False, frequency: str = "montly") -> List[Dict[str, Any]]:
try:
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
current_period = str(datetime_by_tzone(timezone, format_date))[:7]
......@@ -48,8 +47,8 @@ def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str,
tasks.update(last_tasks)
conf.pop(-1)
current_datetime = str(datetime_by_tzone(timezone, '%Y-%m-%d %H:%M:%S'))
new_process = {"date": current_datetime, "status": status, "tasks": tasks}
if current_period == processed_period:
new_process = {"date": current_datetime, "status": status, "tasks": tasks, "objects_created": created_tables}
if current_period == processed_period and isinstance(conf, List):
conf.append(new_process)
else:
conf = [new_process]
......
......@@ -17,7 +17,6 @@ def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool =
result = conn.execute(count_command).first()
if result:
total_rows = int(result[0])
logger.info(f"Total de filas: {total_rows}")
if total_rows == chunksize:
response["steps"] = 1
else:
......
......@@ -8,7 +8,7 @@ logger = logging.getLogger()
def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
save = True
save = False
try:
chunksize = 2000
# db_type = connection.db_type
......@@ -30,6 +30,7 @@ def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
# df.to_sql(tablename, conn, if_exists='append', dtype=dtyp, index=False, chunksize=chunksize)
# else:
df.to_sql(tablename, conn, if_exists='append', index=False, chunksize=chunksize)
save = True
except Exception as e:
logger.error(f"Error guardando resultados desde dataframe. {e}")
raise AssertionError(f"Error guardando resultados desde dataframe. {e}")
......
......@@ -18,7 +18,7 @@ def execute_transformations(commands: List[str], engine):
def delete_table(tablename: str, engine) -> bool:
delete = False
try:
command = f'DROP TABLE {tablename}'
command = f'DROP TABLE IF EXISTS {tablename}'
start_time = time.time()
with engine.connect() as conn:
try:
......@@ -31,3 +31,21 @@ def delete_table(tablename: str, engine) -> bool:
logger.error(f"Error borrando tabla {tablename}. {e}")
finally:
return delete
def delete_procedure(procedure: str, engine) -> bool:
delete = False
try:
command = f"DROP PROCEDURE IF EXISTS {procedure}"
start_time = time.time()
with engine.connect() as conn:
try:
_ = conn.execute(command)
except Exception as e:
logger.error(f"Procedure no encontrado. {e}")
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
except Exception as e:
logger.error(f"Error borrando procedure {procedure}. {e}")
finally:
return delete
......@@ -94,3 +94,30 @@ class Database:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
def check_procedure(self, procedure_name, connection) -> bool:
result = False
try:
result = self.factory.check_procedure(procedure_name, connection)
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return result
def check_table(self, table_name, connection) -> bool:
result = False
try:
result = self.factory.check_table(table_name, connection)
except Exception as e:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return result
def verify_table(self, table_name, connection) -> bool:
result = False
try:
result = self.factory.verify_table(table_name, connection)
except Exception as e:
logger.error(f"Error obteniendo numero de registros de la tabla. {e}")
finally:
return result
\ No newline at end of file
......@@ -102,3 +102,37 @@ class Mysql:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
def check_procedure(self, procedure_name, connection) -> bool:
exists = False
try:
check_query = f"SHOW CREATE PROCEDURE {procedure_name}"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de procedure. {e}")
finally:
return exists
def check_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SHOW CREATE TABLE {table_name}"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de tabla. {e}")
finally:
return exists
def verify_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT COUNT(*) FROM {table_name}"
result = connection.execute(check_query)
if result > 0:
exists = True
except Exception as e:
logger.error(f"Error obteniendo counts de tabla. {e}")
finally:
return exists
\ No newline at end of file
......@@ -109,3 +109,37 @@ class Oracle:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
def check_procedure(self, procedure_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT text FROM all_source WHERE name = '{procedure_name}'"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de procedure. {e}")
finally:
return exists
def check_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"DESCRIBE {table_name}"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de tabla. {e}")
finally:
return exists
def verify_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT COUNT(*) FROM {table_name}"
result = connection.execute(check_query)
if result > 0:
exists = True
except Exception as e:
logger.error(f"Error obteniendo counts de tabla. {e}")
finally:
return exists
\ No newline at end of file
......@@ -103,3 +103,37 @@ class Postgres:
logger.error(f"Error obteniendo los nombres de tablas. {e}")
finally:
return tablenames
def check_procedure(self, procedure_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT pg_get_functiondef('{procedure_name}')"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de procedure. {e}")
finally:
return exists
def check_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT pg_get_tabledef('{table_name}')"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de tabla. {e}")
finally:
return exists
def verify_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT COUNT(*) FROM {table_name}"
result = connection.execute(check_query)
if result > 0:
exists = True
except Exception as e:
logger.error(f"Error obteniendo counts de tabla. {e}")
finally:
return exists
\ No newline at end of file
This diff is collapsed.
......@@ -27,6 +27,11 @@ logger = logging.getLogger()
def validate_generate(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
created_tables = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="TABLES_CREATED")
lista = []
if created_tables:
for i in created_tables:
lista.append(i)
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="GENERATORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_TRANSFORMATION", key="CONTROL-CONFIG")
......@@ -41,7 +46,8 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, provider: s
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="GENERATORS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti, True)
conf = update_new_process(conf, status, final_dict, timezone, ti, lista, True)
ti.xcom_push(key="CREATED_TABLES", value=lista)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -133,16 +139,15 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
for campo in dataframe.columns:
if campo in campos.keys():
if campos[campo] == DataTypeEnum.DATE.name:
dataframe[campo] = dataframe[campo].dt.date
dataframe[campo] = pd.to_datetime(dataframe[campo]).dt.date
# elif campos[campo] == DataTypeEnum.DATETIME.name: # datetime:
# dataframe[campo] = pd.to_datetime(dataframe[campo], format='%Y-%m-%d %H:%M:%S')
dataframe = dataframe.drop("INTERN_ID_BCOM", axis=1, errors='ignore')
logger.debug(dataframe)
dataframe.to_csv(tmp_file, sep=delimiter, index=False, mode='a', header=header)
except StopIteration:
break
conn_id = params["s3_params"]["connection_id"]
list_outputs = params["s3_params"]
size = os.path.getsize(tmp_file)
for output in list_outputs:
......@@ -150,12 +155,14 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
continue
bucket = list_outputs[output]["bucket"]
prefix = list_outputs[output]["prefix"]
conn_id = list_outputs[output]["connection_id"]
if not prefix.endswith("/"):
prefix += "/"
file_key = prefix + tmp_file[tmp_file.rfind("/")+1:]
# Se sube el archivo al S3
logger.info(f"Tamaño del archivo a subir: {size} bytes")
save_df_to_s3(tmp_file, conn_id, bucket, file_key, provider, in_memory=False)
if not save_df_to_s3(tmp_file, conn_id, bucket, file_key, provider, in_memory=False):
raise AssertionError(f"No se guardó el archivo en la ruta indicada")
# Se borra el archivo al finalizar el upload
delete_temp_dir(tmp_file)
break
......@@ -192,7 +199,6 @@ def get_generate_from_xcom(**kwargs):
tablename = select_multiple(select)["tablename"]
if (tasks["reset"] or tasks["status"] == ProcessStatusEnum.SUCCESS.value or select not in success_tasks) and tablename in tasks_with_save:
final_outputs.append(select)
logger.info(f"Final outputs: {final_outputs}")
Variable.set(key='GENERATES', value=final_outputs, serialize_json=True)
if len(final_outputs) > 0:
return [[item] for item in final_outputs]
......
......@@ -95,6 +95,7 @@ def get_base_date(conn: str, bucket: str, key: str) -> datetime.date:
def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, provider: str, delimiter: str = ",",
in_memory: bool = True):
result = False
try:
logger.info(f"SUBIENDO A NUBE KEY {key}")
file_type = get_type_file(key)
......@@ -129,9 +130,10 @@ def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, p
hook.upload(bucket, key, data)
else:
hook.load_file(data, key, bucket)
result = True
except Exception as e:
logger.error(f"Error guardando archivos a S3. key: {key}. {e}")
return result
def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str) -> None:
try:
......
......@@ -5,7 +5,10 @@ from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from components.DatabaseOperation.DatabaseTransformation import delete_table, delete_procedure
from components.S3Route import get_files_from_prefix, get_file_from_prefix
from components.Xcom import save_commands_to_xcom
from components.Utils import update_sql_commands_2
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_control_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
......@@ -21,6 +24,11 @@ logger = logging.getLogger()
def validate_transform(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None:
delete_task_instances()
ti = kwargs["ti"]
created_tables = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="TABLES_CREATED")
lista = []
if created_tables:
for i in created_tables:
lista.append(i)
success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
......@@ -35,7 +43,7 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, provider:
for failed_task in failed_tasks:
task = ti.xcom_pull(task_ids="TRANSFORMATIONS", key=failed_task)[0]
final_dict.update({failed_task: task})
conf = update_new_process(conf, status, final_dict, timezone, ti, True)
conf = update_new_process(conf, status, final_dict, timezone, ti, lista, True)
if status == ProcessStatusEnum.FAIL.value:
conn = control_params["connection_id"]
bucket = control_params["bucket"]
......@@ -91,17 +99,68 @@ def transformations(xcom_commands: str, intern_conn, timezone: str, **kwargs):
script_name = xcom_commands[0]
commands = xcom_commands[1]
logger.info(f"Ejecutando transformaciones del script {script_name}")
not_procedure = ["UPDATE","SELECT","CREATE","ALTER","DROP","DELETE","INSERT","GRANT","REVOKE","TRUNCATE","COPY",
"COMMIT","ROLLBACK","USE","BEGIN"]
with engine.connect() as connection:
for command in commands:
logger.info(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
if any(command.startswith(palabra) or command.startswith(palabra.lower()) for palabra in not_procedure):
logger.info(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
else:
logger.info(f"Generando llamada al procedure según bd para: {command}")
command = intern_conn.generate_sql_procedure(command)
logger.info(f"EJECUTANDO FINAL PROCEDURE: {command}")
_ = connection.execute(command)
end_process_datetime = datetime_by_tzone(timezone).strftime('%d/%m/%Y %H:%M:%S')
task.xcom_push(key="END_PROCESS_DATETIME_" + str(task.map_index), value=end_process_datetime)
@task(task_id="MASTER_TRANSFORMATION", trigger_rule='none_skipped')
def get_trans_from_xcom(**kwargs):
def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablename, transform_mask, order_delimiter,
db_intern_conn, **kwargs):
task = kwargs['ti']
#CAMBIOS PARA TRAER LOS SP
proc_created = []
tables_created = task.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="TABLES_CREATED")
lista = []
for i in tables_created:
lista.append(i)
engine = db_intern_conn.engine
conn_id = store_procedure["s3_params"]["connection_id"]
bucket = store_procedure["s3_params"]["bucket"]
prefix = store_procedure["s3_params"]["prefix"]
procedures = get_files_from_prefix(conn_id, bucket, prefix, provider)
definitions = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="EXTRACTION-DEFINITION-JSON")
#VALIDAR NOMBRE DE LOS PROCEDURES Y LABEL STORE
for element in definitions:
if "temp_table" in element.keys() and element["temp_table"] == True:
lista.append(element["identifier"])
with engine.connect() as connection:
for procedure in procedures:
procedure = procedure[1]
if procedure.find(label_tablename + ":") != -1:
index = procedure.find(label_tablename + ":")
label_lenght = len(label_tablename + ":")
tablename = procedure[index + label_lenght:].strip().split("\n")[0]
delete_procedure(tablename, engine)
for element in definitions:
if element["identifier"] == tablename and "transformation_store_procedure" in element.keys() and element["transformation_store_procedure"] == True:
logger.info(f"Ejecutando creacion de procedure: {procedure}")
# result = db_intern_conn.check_procedure(tablename, connection)
# if not result:
try:
_ = connection.execute(procedure)
except Exception as e:
logger.error(f" Error: {e}")
raise AirflowSkipException
proc_created.append(tablename)
lista.append(tablename)
else:
logger.debug(f"No se encontró el label en {procedure} por ende no se creará")
save_commands_to_xcom(procedures, kwargs['ti'], procedure_mask, transform_mask, procedure_mask, order_delimiter)
logger.debug(f"Procedures cargados en Xcom: {procedures}")
transforms_per_file = []
conf = task.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
tasks = get_tasks_from_control(conf, "transformation")
......@@ -121,6 +180,8 @@ def get_trans_from_xcom(**kwargs):
final_transforms.append(transform)
transforms_per_file.append((key, final_transforms))
logger.info(f"Scripts para la transformación: {transforms_per_file}")
task.xcom_push(key="PROC_CREATED", value=proc_created)
task.xcom_push(key="TABLES_CREATED", value=lista)
Variable.set(key='TRANSFORMS', value=transforms_per_file, serialize_json=True)
if len(transforms_per_file) > 0:
return [[item] for item in transforms_per_file]
......@@ -129,11 +190,13 @@ def get_trans_from_xcom(**kwargs):
def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str, Any],
provider: str) -> TaskGroup or None:
provider: str, store_procedure: Dict, procedure_mask: str,
label_tablename: str, transform_mask: str, order_delimiter: str) -> TaskGroup or None:
group = None
try:
with TaskGroup(group_id="TransformacionDeDatos", prefix_group_id=False) as group:
transforms = get_trans_from_xcom()
transforms = get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablename, transform_mask, order_delimiter,
db_intern_conn)
tasks = PythonOperator.partial(
task_id="TRANSFORMATIONS",
......
......@@ -286,7 +286,6 @@ def generateModel(tablename: str, attributes: List[Dict[str, Any]], indexes: Lis
model = type(modelName, (InsumoModel,), model_args)
try:
for attribute in attributes:
logger.debug(f"attribute: {attribute}")
if attribute["datatype"] == DataTypeEnum.TEXT.name and "maxLength" in attribute.keys():
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(attribute["maxLength"])))
......
......@@ -6,24 +6,25 @@ app:
sources:
source1:
type: mysql
host: 192.168.21.52
host: 192.168.1.13
port: 13306
username: root
password: root1234
database: bcom_tp_res_bk
password: root
database: prueba
service:
schema: sources
transformation:
type: mysql
host: 192.168.1.4
host: 192.168.1.13
port: 13306
username: root
password: root
database: prueba_bcom2
database: prueba_ca
service:
schema: intern_db
chunksize: 4000
label_multiple_select: TABLENAME
label_transform_procedure: STORE
source_mask: select # Sufijo (S)
procedure_mask: procedure # S
transformation_mask: transform # S
......@@ -31,14 +32,20 @@ app:
cloud_provider: local
scripts:
s3_params:
bucket: prueba1234568
prefix: bcom_scripts
bucket: prueba-id
prefix: prueba_bcom/bcom_scripts
connection_id: conn_script
store_procedures:
s3_params:
bucket: prueba-id
prefix: prueba_bcom/bcom_store_procedures
connection_id: conn_script
control:
s3_params:
connection_id: conn_script
bucket: prueba1234568
prefix: bcom_control
bucket: prueba-id
prefix: prueba_bcom/bcom_control
filename: control_<period>.json
timezone: 'GMT-5'
outputs:
......@@ -48,21 +55,22 @@ app:
delimiter: '|'
tmp_path: /tmp
s3_params:
TACOMVENTAS:
bucket: prueba1234568
prefix: bcom_results
RANGO_VENTAS_CON_PROMOCION:
bucket: prueba-id
prefix: prueba_bcom/bcom_results
connection_id: conn_script
tabla4:
bucket: pruebairflow
prefix: bcom_results
connection_id: prueba_af
tabla5:
bucket: pruebairflow
prefix: bcom_results
connection_id: prueba_af
report:
s3_params:
bucket: prueba1234568
prefix: bcom_report
bucket: prueba-id
prefix: prueba_bcom/bcom_report
connection_id: conn_script
filename: report_<datetime>.xlsx
datetime_pattern: '%Y-%m-%d %H:%M:%S'
procedure:
filepath: "/opt/airflow/dags/procedure_definition.json"
definitions:
filepath: "/opt/airflow/dags/procedure_prueba.json"
......@@ -211,7 +211,6 @@ def set_dag():
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que informa del último proceso ejecutado",
schedule_interval=conf["inform_dag_schedule"], tags=["DAG BCOM - INFORM PROCESS"], catchup=False) as dag:
......
......@@ -6,7 +6,7 @@ import json
from airflow.operators.python import PythonOperator
from components.Databases.Database import Database
from components.DatabaseOperation.DatabaseTransformation import delete_table
from components.DatabaseOperation.DatabaseTransformation import delete_table, delete_procedure
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from components.S3Route import upload_file
from components.Control import extract_last_control
......@@ -58,22 +58,39 @@ def update_control(control_params: Dict[str, Any], provider: str, **kwargs) -> N
logger.error(f"Error actualizando archivo de control. {e}")
def reset_process(intern_db, output_tmp_dir: str) -> None:
def reset_process(intern_db, output_tmp_dir: str, procedure_filepath, **kwargs) -> None:
try:
# Borrrando tablas
tablenames = intern_db.get_all_tablenames()
if len(tablenames) == 0:
logger.info("No se encontraron tablas para su limpieza")
else:
for tablename in tablenames:
tablename = tablename[0]
delete = delete_table(tablename, intern_db.engine)
if delete:
logger.info(f"Borrado correctamente la tabla {tablename}")
task = kwargs['ti']
with open(procedure_filepath) as f:
data = json.load(f)
if data:
control_config = task.xcom_pull(task_ids="CONTROL-EXTRACTOR", key="CONTROL-CONFIG")
logger.info(f"control config {control_config}")
final_ex = control_config[-1][-1]
# for index in range(len(control_config), 0, -1):
# logger.info(f"index: {index}")
# logger.info(f"index: {control_config[index]}")
# if "create_tables" in control_config[index].keys() and len(control_config[index]["create_tables"]) !=0 and "reset_by_user" not in control_config[index].keys():
# final_ex = control_config[index]
# break
tablenames = final_ex["objects_created"]
if len(tablenames) == 0:
logger.info("No se encontraron tablas para su limpieza")
else:
for tablename in tablenames:
for element in data:
if tablename == element["identifier"]:
if "transformation_store_procedure" in element.keys():
delete = delete_procedure(tablename, intern_db.engine)
else:
delete = delete_table(tablename, intern_db.engine)
if delete:
logger.info(f"Borrado correctamente la tabla {tablename}")
# Borrando archivos temporales que hayan quedado sueltos
delete = delete_temp_dirs(output_tmp_dir)
if delete:
logger.info("Se borraron todos los archivos temporales")
# delete = delete_temp_dirs(output_tmp_dir)
# if delete:
# logger.info("Se borraron todos los archivos temporales")
except Exception as e:
logger.error(f"Error procesando archivo de control. {e}")
......@@ -89,12 +106,13 @@ def set_dag():
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que resetea el último proceso ejecutado",
schedule_interval=conf["reset_dag_schedule"], tags=["DAG BCOM - RESET PROCESS"], catchup=False) as dag:
control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"]
# NUEVA VARIABLE
procedure = conf["definitions"]
control_extractor = PythonOperator(
task_id="CONTROL-EXTRACTOR",
python_callable=extract_last_control,
......@@ -113,7 +131,7 @@ def set_dag():
control_process = PythonOperator(
task_id="RESET-PROCESS",
python_callable=reset_process,
op_kwargs={'intern_db': intern_db, 'output_tmp_dir': tmp_dir},
op_kwargs={'intern_db': intern_db, 'output_tmp_dir': tmp_dir, 'procedure_filepath': procedure["filepath"]},
trigger_rule="all_success"
)
......
......@@ -22,7 +22,7 @@ import logging
logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM"
DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM2"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/opt/airflow/dags/"
......@@ -59,10 +59,11 @@ def generate_and_deploy_results(intern_conn, parameters: Dict[str, Any], timezon
return groups
def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any], provider: str) -> TaskGroup:
def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any], provider: str, store_procedure: Dict, procedure_mask: str,
label_tablename: str, transform_mask: str, order_delimiter: str) -> TaskGroup:
groups = None
try:
groups = get_transform_task_group(intern_conn, timezone, control_s3, provider)
groups = get_transform_task_group(intern_conn, timezone, control_s3, provider, store_procedure, procedure_mask, label_tablename, transform_mask, order_delimiter)
except Exception as e:
logger.error(f"Error general de transformación de datos. {e}")
finally:
......@@ -84,7 +85,6 @@ def save_procedure_json(json_path: str, task) -> None:
try:
with open(json_path) as f:
data = json.load(f)
logger.info(f"JSON-PROCEDURE {data}")
if data:
task.xcom_push(key="EXTRACTION-DEFINITION-JSON", value=data)
except Exception as e:
......@@ -106,6 +106,7 @@ def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str,
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}]
logger.info(f"Json de control creado: {data}")
task.xcom_push(key="CONTROL-CONFIG", value=data)
except Exception as e:
logger.error(f"Error general de descarga de archivo de control. {e}")
......@@ -130,7 +131,6 @@ def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, tr
def set_dag():
""" DAG that execute a series of SQL commands from scripts to make a result file and save on a bucket (for now)"""
import yaml
from yaml.loader import SafeLoader
......@@ -139,7 +139,6 @@ def set_dag():
conf_path = MAIN_PATH + "dag_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que extrae y transforma",
schedule_interval=conf["schedule"], tags=["DAG BCOM - SQL TRANSFORMATIONS"], catchup=False) as dag:
......@@ -153,7 +152,7 @@ def set_dag():
control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"]
# NUEVA VARIABLE
procedure = conf["procedure"]
procedure = conf["definitions"]
# Scripts extraction
extract_mask = conf["source_mask"]
transform_mask = conf["transformation_mask"]
......@@ -190,10 +189,13 @@ def set_dag():
extractions = extraction(source_db, intern_db, timezone, control_s3, conf["cloud_provider"], chunksize)
# Creación de grupo de tasks para las transformaciones
transformations = transformation(intern_db, timezone, control_s3, conf["cloud_provider"])
store_procedure = conf["store_procedures"]
transformations = transformation(intern_db, timezone, control_s3, conf["cloud_provider"], store_procedure, procedure_mask, conf["label_transform_procedure"],
transform_mask, order_delimiter)
# Creación de grupo de tasks para la generación y despliegue de archivos resultados
outputs_conf = conf["outputs"]
result = generate_and_deploy_results(intern_db, outputs_conf, timezone, control_s3, conf["cloud_provider"])
# Creación de tasks de limpiadores
......
[
{
"identifier": "ESTUDIANTES_11",
"identifier": "ModificarTabla4",
"transformation_store_procedure": true
},
{
"identifier" : "UnionYInsert",
"transformation_store_procedure": true
},
{
"identifier": "TempTabla",
"temp_table" : true
},
{
"identifier": "tabla11",
"fields": [
{
"name": "ID",
"name": "id",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "Nombre",
"name": "nombre",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "Apellido",
"name": "fecha_nacimiento",
"datatype": "DATE"
}
],
"indexes": [
{
"name" : "indice1",
"index_fields": [
"id"
]
}
],
"save_output" : false
},
{
"identifier": "tabla2",
"fields": [
{
"name": "id",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "apellido",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "Edad",
"name": "fecha_registro",
"datatype": "DATETIME"
}
],
"indexes": [
{
"name" : "indice1",
"index_fields": [
"id"
]
}
],
"save_output" : false
},
{
"identifier": "tabla3",
"fields": [
{
"name": "id",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "CorreoElectronico",
"datatype": "TEXT",
"maxLength": 100
"name": "edad",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "fecha_actualizacion",
"datatype": "DATETIME"
}
],
"indexes": [
"ID"
{
"name" : "indice1",
"index_fields": [
"id"
]
}
],
"save_output" : true
"save_output" : false
},
{
"identifier": "ESTUDIANTES_1",
"identifier": "tabla4",
"fields": [
{
"name": "ID",
"name": "id",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "Nombre",
"name": "nombre",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "Apellido",
"datatype": "TEXT",
"maxLength": 50
"name": "fecha",
"datatype": "DATE"
},
{
"name": "fecha_2",
"datatype": "DATE"
},
{
"name": "Edad",
"name": "fecha_3",
"datatype": "DATETIME"
}
],
"indexes": [
{
"name" : "indice1",
"index_fields": [
"id"
]
}
],
"save_output" : true
},
{
"identifier": "tabla5",
"fields": [
{
"name": "id",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "CorreoElectronico",
"name": "nombre",
"datatype": "TEXT",
"maxLength": 100
"maxLength": 50
},
{
"name": "fecha",
"datatype": "DATE"
},
{
"name": "fecha_2",
"datatype": "DATETIME"
},
{
"name": "fecha_3",
"datatype": "DATETIME"
}
],
"indexes": [
"ID"
{
"name" : "indice1",
"index_fields": [
"id"
]
}
],
"save_output" : true
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment