Commit c22cefd9 authored by Cristian Aguirre's avatar Cristian Aguirre

Update 26-07-23. Add new provider: GCP to get inputs and save results.

parent 8dff47d1
...@@ -16,7 +16,7 @@ import logging ...@@ -16,7 +16,7 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
def validate_clean(control_params: Dict[str, Any], **kwargs) -> None: def validate_clean(control_params: Dict[str, Any], provider: str, **kwargs) -> None:
delete_task_instances() delete_task_instances()
ti = kwargs["ti"] ti = kwargs["ti"]
conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG") conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG")
...@@ -27,7 +27,7 @@ def validate_clean(control_params: Dict[str, Any], **kwargs) -> None: ...@@ -27,7 +27,7 @@ def validate_clean(control_params: Dict[str, Any], **kwargs) -> None:
prefix += "/" prefix += "/"
key = prefix + control_params["filename"] key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str) conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key) loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key, provider)
if loaded: if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}") logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks() delete_all_xcom_tasks()
...@@ -61,7 +61,7 @@ def get_cleaners_from_xcom(**kwargs): ...@@ -61,7 +61,7 @@ def get_cleaners_from_xcom(**kwargs):
return [[item] for item in final_selects] return [[item] for item in final_selects]
def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any]) -> TaskGroup or None: def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider: str) -> TaskGroup or None:
group = None group = None
try: try:
with TaskGroup(group_id="LimpiezaDelProceso", prefix_group_id=False) as group: with TaskGroup(group_id="LimpiezaDelProceso", prefix_group_id=False) as group:
...@@ -76,7 +76,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any]) -> TaskG ...@@ -76,7 +76,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any]) -> TaskG
validate_task = PythonOperator( validate_task = PythonOperator(
task_id="VALIDATE_CLEANER", task_id="VALIDATE_CLEANER",
python_callable=validate_clean, python_callable=validate_clean,
op_kwargs={'control_params': control_s3}, op_kwargs={'control_params': control_s3, 'provider': provider},
trigger_rule='none_skipped' trigger_rule='none_skipped'
) )
cleaners >> tasks >> validate_task cleaners >> tasks >> validate_task
......
...@@ -8,7 +8,7 @@ def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool = ...@@ -8,7 +8,7 @@ def get_steps(sql_command: str, chunksize: int, connection, is_tablename: bool =
final_steps = 0 final_steps = 0
try: try:
if is_tablename: if is_tablename:
count_command = f"SELECT COUNT(*) FROM {sql_command}" count_command = f'SELECT COUNT(*) FROM "{sql_command}"'
else: else:
count_command = f"SELECT COUNT(*) FROM ({sql_command}) BCOM" count_command = f"SELECT COUNT(*) FROM ({sql_command}) BCOM"
with connection.connect() as conn: with connection.connect() as conn:
......
...@@ -18,7 +18,7 @@ def execute_transformations(commands: List[str], engine): ...@@ -18,7 +18,7 @@ def execute_transformations(commands: List[str], engine):
def delete_table(tablename: str, engine) -> bool: def delete_table(tablename: str, engine) -> bool:
delete = False delete = False
try: try:
command = f"DROP TABLE {tablename}" command = f'DROP TABLE "{tablename}"'
start_time = time.time() start_time = time.time()
with engine.connect() as conn: with engine.connect() as conn:
try: try:
......
from typing import Any, Dict from typing import Any, Dict
import json import json
import numpy as np
import pandas as pd import pandas as pd
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from enums.ProcessStatusEnum import ProcessStatusEnum from enums.ProcessStatusEnum import ProcessStatusEnum
from components.Utils import select_multiple, generateModel from components.Utils import select_multiple, generateModel
from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps
...@@ -23,7 +23,7 @@ import logging ...@@ -23,7 +23,7 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
def validate_extractor(control_params: Dict[str, Any], timezone: str, **kwargs) -> None: def validate_extractor(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None:
delete_task_instances() delete_task_instances()
ti = kwargs["ti"] ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="SUCCESS_TASKS") success_tasks = ti.xcom_pull(task_ids="EXTRACTORS", key="SUCCESS_TASKS")
...@@ -49,7 +49,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, **kwargs) ...@@ -49,7 +49,7 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, **kwargs)
prefix += "/" prefix += "/"
key = prefix + control_params["filename"] key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str) conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key) loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key, provider)
if loaded: if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}") logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks() delete_all_xcom_tasks()
...@@ -141,14 +141,14 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int, ...@@ -141,14 +141,14 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
source_conn.close_basic_connection() source_conn.close_basic_connection()
else: else:
if command.replace(" ", "").lower().find("|select") != -1: if command.replace(" ", "").lower().find("|select") != -1:
command = command[command.find("select"):] command = command[command.lower().find("select"):]
steps = get_steps(command, chunksize, source_engine) steps = get_steps(command, chunksize, source_engine)
# Traemos el iterator # Traemos el iterator
iterator = get_iterator(command, chunksize, source_engine) iterator = get_iterator(command, chunksize, source_engine)
logger.info(f"Número de pasos para migrar datos: {steps}") logger.info(f"Número de pasos para migrar datos: {steps}")
for step in range(steps): for step in range(steps):
dataframe = next(iterator) dataframe = next(iterator)
dataframe["INTERN_ID_BCOM"] = None # dataframe["INTERN_ID_BCOM"] = np.NaN
logger.debug(dataframe) logger.debug(dataframe)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine) save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
if save: if save:
...@@ -186,7 +186,7 @@ def get_select_from_xcom(**kwargs): ...@@ -186,7 +186,7 @@ def get_select_from_xcom(**kwargs):
def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timezone: str, def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timezone: str,
control_s3: Dict[str, Any]) -> TaskGroup or None: control_s3: Dict[str, Any], provider: str) -> TaskGroup or None:
group = None group = None
try: try:
with TaskGroup(group_id="ExtraccionDeDatos", prefix_group_id=False) as group: with TaskGroup(group_id="ExtraccionDeDatos", prefix_group_id=False) as group:
...@@ -203,7 +203,7 @@ def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timez ...@@ -203,7 +203,7 @@ def get_extract_task_group(db_source_conn, db_intern_conn, chunksize: int, timez
validate_task = PythonOperator( validate_task = PythonOperator(
task_id="VALIDATE_EXTRACTION", task_id="VALIDATE_EXTRACTION",
python_callable=validate_extractor, python_callable=validate_extractor,
op_kwargs={'control_params': control_s3, 'timezone': timezone}, op_kwargs={'control_params': control_s3, 'timezone': timezone, 'provider': provider},
trigger_rule='all_done' trigger_rule='all_done'
) )
selects >> tasks >> validate_task selects >> tasks >> validate_task
......
...@@ -20,7 +20,7 @@ import logging ...@@ -20,7 +20,7 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
def validate_generate(control_params: Dict[str, Any], timezone: str, **kwargs) -> None: def validate_generate(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None:
delete_task_instances() delete_task_instances()
ti = kwargs["ti"] ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS") success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
...@@ -46,7 +46,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, **kwargs) - ...@@ -46,7 +46,7 @@ def validate_generate(control_params: Dict[str, Any], timezone: str, **kwargs) -
prefix += "/" prefix += "/"
key = prefix + control_params["filename"] key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str) conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key) loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key, provider)
if loaded: if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}") logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks() delete_all_xcom_tasks()
...@@ -78,7 +78,8 @@ def on_success_generator(context) -> None: ...@@ -78,7 +78,8 @@ def on_success_generator(context) -> None:
ti.xcom_push(key="SUCCESS_TASKS", value=task_name) ti.xcom_push(key="SUCCESS_TASKS", value=task_name)
def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timezone: str, chunksize=10000): def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timezone: str,
provider: str, chunksize=10000):
engine = intern_conn.engine engine = intern_conn.engine
logger.debug(f"COMANDO: {command}") logger.debug(f"COMANDO: {command}")
tablename = select_multiple(command)["tablename"] tablename = select_multiple(command)["tablename"]
...@@ -113,7 +114,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez ...@@ -113,7 +114,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
file_key = prefix + tmp_file[tmp_file.rfind("/")+1:] file_key = prefix + tmp_file[tmp_file.rfind("/")+1:]
# Se sube el archivo al S3 # Se sube el archivo al S3
logger.info(f"Tamaño del archivo a subir: {os.path.getsize(tmp_file)} bytes") logger.info(f"Tamaño del archivo a subir: {os.path.getsize(tmp_file)} bytes")
save_df_to_s3(tmp_file, conn_id, bucket, file_key, in_memory=False) save_df_to_s3(tmp_file, conn_id, bucket, file_key, provider, in_memory=False)
# Se borra el archivo al finalizar el upload # Se borra el archivo al finalizar el upload
delete_temp_dir(tmp_file) delete_temp_dir(tmp_file)
...@@ -144,7 +145,7 @@ def get_generate_from_xcom(**kwargs): ...@@ -144,7 +145,7 @@ def get_generate_from_xcom(**kwargs):
def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_s3: Dict[str, Any], def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_s3: Dict[str, Any],
timezone: str) -> TaskGroup or None: timezone: str, provider: str) -> TaskGroup or None:
group = None group = None
try: try:
with TaskGroup(group_id="GeneracionyDespliegueDeResultados", prefix_group_id=False) as group: with TaskGroup(group_id="GeneracionyDespliegueDeResultados", prefix_group_id=False) as group:
...@@ -155,13 +156,14 @@ def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_ ...@@ -155,13 +156,14 @@ def get_generate_task_group(db_intern_conn, parameters: Dict[str, Any], control_
python_callable=generate_and_deploy, python_callable=generate_and_deploy,
on_failure_callback=on_failure_generator, on_failure_callback=on_failure_generator,
on_success_callback=on_success_generator, on_success_callback=on_success_generator,
op_kwargs={'intern_conn': db_intern_conn, 'params': parameters, 'timezone': timezone} op_kwargs={'intern_conn': db_intern_conn, 'params': parameters, 'timezone': timezone,
'provider': provider}
).expand(op_args=outputs) ).expand(op_args=outputs)
validate_task = PythonOperator( validate_task = PythonOperator(
task_id="VALIDATE_GENERATOR", task_id="VALIDATE_GENERATOR",
python_callable=validate_generate, python_callable=validate_generate,
op_kwargs={'control_params': control_s3, 'timezone': timezone}, op_kwargs={'control_params': control_s3, 'timezone': timezone, 'provider': provider},
trigger_rule='none_skipped' trigger_rule='none_skipped'
) )
outputs >> tasks >> validate_task outputs >> tasks >> validate_task
......
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, BigInteger from sqlalchemy import Column, BIGINT
Base = declarative_base() Base = declarative_base()
...@@ -8,4 +8,4 @@ class InsumoModel(Base): ...@@ -8,4 +8,4 @@ class InsumoModel(Base):
__abstract__ = True __abstract__ = True
INTERN_ID_BCOM = Column(BigInteger, primary_key=True, autoincrement=True) INTERN_ID_BCOM = Column(BIGINT, primary_key=True, autoincrement=True)
import fnmatch import fnmatch
import datetime import datetime
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
import json
import pytz import pytz
from io import BytesIO, StringIO from io import BytesIO, StringIO
import pandas as pd import pandas as pd
...@@ -9,8 +8,10 @@ import pandas as pd ...@@ -9,8 +8,10 @@ import pandas as pd
from components.Utils import get_type_file from components.Utils import get_type_file
from enums.FileTypeEnum import FileTypeEnum from enums.FileTypeEnum import FileTypeEnum
from enums.ScriptFileTypeEnum import ScriptFileTypeEnum from enums.ScriptFileTypeEnum import ScriptFileTypeEnum
from enums.ProviderTypeEnum import ProviderTypeEnum
from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
...@@ -90,18 +91,26 @@ def get_base_date(conn: str, bucket: str, key: str) -> datetime.date: ...@@ -90,18 +91,26 @@ def get_base_date(conn: str, bucket: str, key: str) -> datetime.date:
return last_date return last_date
def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, delimiter: str = ",", def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, provider: str, delimiter: str = ",",
in_memory: bool = True): in_memory: bool = True):
try: try:
logger.info(f"SUBIENDO A NUBE KEY {key}") logger.info(f"SUBIENDO A NUBE KEY {key}")
file_type = get_type_file(key) file_type = get_type_file(key)
s3_hook = S3Hook(conn) gcp_cloud = False
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
hook = S3Hook(conn)
else:
hook = GoogleCloudStorageHook(conn)
gcp_cloud = True
if file_type == FileTypeEnum.EXCEL or file_type == FileTypeEnum.OLD_EXCEL: if file_type == FileTypeEnum.EXCEL or file_type == FileTypeEnum.OLD_EXCEL:
if in_memory: if in_memory:
with BytesIO() as buffer: with BytesIO() as buffer:
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer:
data.to_excel(writer, index=None) data.to_excel(writer, index=None)
s3_hook.load_bytes(buffer.getvalue(), key, bucket, True) if gcp_cloud:
hook.upload(bucket, key, data=buffer.getvalue())
else:
hook.load_bytes(buffer.getvalue(), key, bucket, True)
else: else:
pass pass
elif file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TEXT: elif file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TEXT:
...@@ -109,9 +118,15 @@ def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, d ...@@ -109,9 +118,15 @@ def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, d
csv_buffer = BytesIO() csv_buffer = BytesIO()
data.to_csv(csv_buffer, header=True, index=False, sep=delimiter, na_rep='None') data.to_csv(csv_buffer, header=True, index=False, sep=delimiter, na_rep='None')
csv_buffer.seek(0) csv_buffer.seek(0)
s3_hook.load_bytes(csv_buffer.getvalue(), key, bucket, True) if gcp_cloud:
hook.upload(bucket, key, data=csv_buffer.getvalue())
else:
hook.load_bytes(csv_buffer.getvalue(), key, bucket, True)
else:
if gcp_cloud:
hook.upload(bucket, key, data)
else: else:
s3_hook.load_file(data, key, bucket) hook.load_file(data, key, bucket)
except Exception as e: except Exception as e:
logger.error(f"Error guardando archivos a S3. key: {key}. {e}") logger.error(f"Error guardando archivos a S3. key: {key}. {e}")
...@@ -127,17 +142,28 @@ def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str) -> ...@@ -127,17 +142,28 @@ def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str) ->
logger.error(f"Error moviendo archivo desde {source_key} hacia {output_key} en bucket {bucket}. {e}") logger.error(f"Error moviendo archivo desde {source_key} hacia {output_key} en bucket {bucket}. {e}")
def get_files_from_prefix(conn: str, bucket: str, prefix: str) -> List[Tuple[str, str]]: def get_files_from_prefix(conn: str, bucket: str, prefix: str, provider: str) -> List[Tuple[str, str]]:
result = [] result = []
allowed_filetypes = [ScriptFileTypeEnum[item].value for item in ScriptFileTypeEnum._member_names_] allowed_filetypes = [ScriptFileTypeEnum[item].value for item in ScriptFileTypeEnum._member_names_]
try: try:
files = []
s3_hook, gcp_hook, data = None, None, None
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
s3_hook = S3Hook(conn) s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix) files = s3_hook.list_keys(bucket, prefix)
elif provider == ProviderTypeEnum.GOOGLE.value:
gcp_hook = GoogleCloudStorageHook(conn)
if not prefix.endswith("/"):
prefix += "/"
files = gcp_hook.list(bucket, prefix=prefix)
logger.debug(f"Archivos encontrados en el prefijo {prefix}: {files}") logger.debug(f"Archivos encontrados en el prefijo {prefix}: {files}")
for file in files: for file in files:
if file.endswith("/") or file[file.rfind(".")+1:].lower() not in allowed_filetypes: if file.endswith("/") or file[file.rfind(".")+1:].lower() not in allowed_filetypes:
continue continue
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
data = s3_hook.get_key(file, bucket).get()['Body'].read().decode("utf-8") data = s3_hook.get_key(file, bucket).get()['Body'].read().decode("utf-8")
elif provider == ProviderTypeEnum.GOOGLE.value:
data = gcp_hook.download(bucket, file).decode("utf-8")
if file.find("/") == -1: if file.find("/") == -1:
filename = file filename = file
else: else:
...@@ -150,12 +176,17 @@ def get_files_from_prefix(conn: str, bucket: str, prefix: str) -> List[Tuple[str ...@@ -150,12 +176,17 @@ def get_files_from_prefix(conn: str, bucket: str, prefix: str) -> List[Tuple[str
return result return result
def get_file_from_key(conn: str, bucket: str, key: str) -> Any: def get_file_from_key(conn: str, bucket: str, key: str, provider: str) -> Any:
result = BytesIO() result = BytesIO()
try: try:
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
s3_hook = S3Hook(conn) s3_hook = S3Hook(conn)
data = s3_hook.get_key(key, bucket) data = s3_hook.get_key(key, bucket)
data.download_fileobj(result) data.download_fileobj(result)
elif provider == ProviderTypeEnum.GOOGLE.value:
gcp_hook = GoogleCloudStorageHook(conn)
result = gcp_hook.download(bucket, key)
print("RESULT:", result)
except Exception as e: except Exception as e:
result = None result = None
logger.error(f"Error extrayendo archivo {key}. {e}") logger.error(f"Error extrayendo archivo {key}. {e}")
...@@ -163,11 +194,15 @@ def get_file_from_key(conn: str, bucket: str, key: str) -> Any: ...@@ -163,11 +194,15 @@ def get_file_from_key(conn: str, bucket: str, key: str) -> Any:
return result return result
def load_obj_to_s3(obj, conn: str, bucket: str, key: str, replace=True) -> bool: def load_obj_to_s3(obj, conn: str, bucket: str, key: str, provider: str, replace=True) -> bool:
load = False load = False
try: try:
if provider == ProviderTypeEnum.AMAZON.value or provider == ProviderTypeEnum.MINIO.value:
s3_hook = S3Hook(conn) s3_hook = S3Hook(conn)
s3_hook.load_bytes(obj, key, bucket, replace) s3_hook.load_bytes(obj, key, bucket, replace)
elif provider == ProviderTypeEnum.GOOGLE.value:
gcp_hook = GoogleCloudStorageHook(conn)
gcp_hook.upload(bucket, key, data=obj)
load = True load = True
except Exception as e: except Exception as e:
logger.error(f"Error subiendo archivo de control a bucket {bucket} y key {key}. {e}") logger.error(f"Error subiendo archivo de control a bucket {bucket} y key {key}. {e}")
......
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.sensors.base import BaseSensorOperator
from enums.ProviderTypeEnum import ProviderTypeEnum from enums.ProviderTypeEnum import ProviderTypeEnum
...@@ -17,11 +18,11 @@ def create_sensor(task_id: str, connection: str, bucket: str, key: str, provider ...@@ -17,11 +18,11 @@ def create_sensor(task_id: str, connection: str, bucket: str, key: str, provider
sensor = None sensor = None
try: try:
if provider == ProviderTypeEnum.GOOGLE.value: if provider == ProviderTypeEnum.GOOGLE.value:
sensor = GCSObjectExistenceSensor( sensor = GCPSensor(
task_id=task_id, task_id=task_id,
conn=connection,
bucket=bucket, bucket=bucket,
object=key, key=key,
google_cloud_conn_id=connection,
poke_interval=POKE_INTERVAL, poke_interval=POKE_INTERVAL,
timeout=TIMEOUT timeout=TIMEOUT
) )
...@@ -39,3 +40,22 @@ def create_sensor(task_id: str, connection: str, bucket: str, key: str, provider ...@@ -39,3 +40,22 @@ def create_sensor(task_id: str, connection: str, bucket: str, key: str, provider
except Exception as e: except Exception as e:
logger.error(f"Error creando Sensor S3. {e}") logger.error(f"Error creando Sensor S3. {e}")
return sensor return sensor
class GCPSensor(BaseSensorOperator):
def __init__(self, conn: str, bucket: str, key: str, **kwargs) -> None:
self.conn = conn
self.bucket = bucket
self.key = key
super().__init__(**kwargs)
def poke(self, context):
hook = GCSHook(self.conn)
end_prefix_index = self.key.rfind("/")
if end_prefix_index != -1 and len(self.key[end_prefix_index:]) > 1:
self.key = self.key[:end_prefix_index + 1]
files = hook.list(self.bucket, prefix=self.key)
files = list(map(lambda x: not x.endswith("/"), files))
return any([criteria for criteria in files])
...@@ -17,7 +17,7 @@ import logging ...@@ -17,7 +17,7 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
def validate_transform(control_params: Dict[str, Any], timezone: str, **kwargs) -> None: def validate_transform(control_params: Dict[str, Any], timezone: str, provider: str, **kwargs) -> None:
delete_task_instances() delete_task_instances()
ti = kwargs["ti"] ti = kwargs["ti"]
success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS") success_tasks = ti.xcom_pull(task_ids="TRANSFORMATIONS", key="SUCCESS_TASKS")
...@@ -43,7 +43,7 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, **kwargs) ...@@ -43,7 +43,7 @@ def validate_transform(control_params: Dict[str, Any], timezone: str, **kwargs)
prefix += "/" prefix += "/"
key = prefix + control_params["filename"] key = prefix + control_params["filename"]
conf = json.dumps(conf, indent=2, default=str) conf = json.dumps(conf, indent=2, default=str)
loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key) loaded = load_obj_to_s3(bytes(conf.encode()), conn, bucket, key, provider)
if loaded: if loaded:
logger.info(f"Cargado correctamente el archivo de control en {key}") logger.info(f"Cargado correctamente el archivo de control en {key}")
delete_all_xcom_tasks() delete_all_xcom_tasks()
...@@ -82,7 +82,7 @@ def transformations(xcom_commands: str, intern_conn): ...@@ -82,7 +82,7 @@ def transformations(xcom_commands: str, intern_conn):
logger.info(f"Ejecutando transformaciones del script {script_name}") logger.info(f"Ejecutando transformaciones del script {script_name}")
with engine.connect() as connection: with engine.connect() as connection:
for command in commands: for command in commands:
logger.info(f"Ejecutando comando de transformación: {command}") logger.debug(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command) _ = connection.execute(command)
...@@ -112,7 +112,8 @@ def get_trans_from_xcom(**kwargs): ...@@ -112,7 +112,8 @@ def get_trans_from_xcom(**kwargs):
return [[item] for item in transforms_per_file] return [[item] for item in transforms_per_file]
def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str, Any]) -> TaskGroup or None: def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str, Any],
provider: str) -> TaskGroup or None:
group = None group = None
try: try:
with TaskGroup(group_id="TransformacionDeDatos", prefix_group_id=False) as group: with TaskGroup(group_id="TransformacionDeDatos", prefix_group_id=False) as group:
...@@ -129,7 +130,7 @@ def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str ...@@ -129,7 +130,7 @@ def get_transform_task_group(db_intern_conn, timezone: str, control_s3: Dict[str
validate_task = PythonOperator( validate_task = PythonOperator(
task_id="VALIDATE_TRANSFORMATION", task_id="VALIDATE_TRANSFORMATION",
python_callable=validate_transform, python_callable=validate_transform,
op_kwargs={'control_params': control_s3, 'timezone': timezone}, op_kwargs={'control_params': control_s3, 'timezone': timezone, 'provider': provider},
trigger_rule='none_skipped' trigger_rule='none_skipped'
) )
transforms >> tasks >> validate_task transforms >> tasks >> validate_task
......
...@@ -103,8 +103,6 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) -> ...@@ -103,8 +103,6 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
final_data = [] final_data = []
table_name = "" table_name = ""
for item in data: for item in data:
# if item.strip().startswith("--") and label_tablename.strip()+":" not in item:
# continue
if item.lower().strip() == "end": if item.lower().strip() == "end":
final_data[-1] = final_data[-1] + "; end;" final_data[-1] = final_data[-1] + "; end;"
final_item = item final_item = item
......
...@@ -39,10 +39,10 @@ DEFAULT_ARGS = { ...@@ -39,10 +39,10 @@ DEFAULT_ARGS = {
} }
def cleaning(intern_conn, control_s3: Dict[str, Any]) -> TaskGroup: def cleaning(intern_conn, control_s3: Dict[str, Any], provider: str) -> TaskGroup:
groups = None groups = None
try: try:
groups = get_cleaning_task_group(intern_conn, control_s3) groups = get_cleaning_task_group(intern_conn, control_s3, provider)
except Exception as e: except Exception as e:
logger.error(f"Error general de transformación de datos. {e}") logger.error(f"Error general de transformación de datos. {e}")
finally: finally:
...@@ -50,20 +50,20 @@ def cleaning(intern_conn, control_s3: Dict[str, Any]) -> TaskGroup: ...@@ -50,20 +50,20 @@ def cleaning(intern_conn, control_s3: Dict[str, Any]) -> TaskGroup:
def generate_and_deploy_results(intern_conn, parameters: Dict[str, Any], timezone: str, def generate_and_deploy_results(intern_conn, parameters: Dict[str, Any], timezone: str,
control_s3: Dict[str, Any]) -> TaskGroup: control_s3: Dict[str, Any], provider: str) -> TaskGroup:
groups = None groups = None
try: try:
groups = get_generate_task_group(intern_conn, parameters, control_s3, timezone) groups = get_generate_task_group(intern_conn, parameters, control_s3, timezone, provider)
except Exception as e: except Exception as e:
logger.error(f"Error general de creación y despliegue de resultados. {e}") logger.error(f"Error general de creación y despliegue de resultados. {e}")
finally: finally:
return groups return groups
def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any]) -> TaskGroup: def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any], provider: str) -> TaskGroup:
groups = None groups = None
try: try:
groups = get_transform_task_group(intern_conn, timezone, control_s3) groups = get_transform_task_group(intern_conn, timezone, control_s3, provider)
except Exception as e: except Exception as e:
logger.error(f"Error general de transformación de datos. {e}") logger.error(f"Error general de transformación de datos. {e}")
finally: finally:
...@@ -71,10 +71,10 @@ def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any]) -> Ta ...@@ -71,10 +71,10 @@ def transformation(intern_conn, timezone: str, control_s3: Dict[str, Any]) -> Ta
def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, Any], def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, Any],
chunksize: int = 100000) -> TaskGroup: provider: str, chunksize: int = 100000) -> TaskGroup:
groups = None groups = None
try: try:
groups = get_extract_task_group(source_conn, intern_conn, chunksize, timezone, control_s3) groups = get_extract_task_group(source_conn, intern_conn, chunksize, timezone, control_s3, provider)
except Exception as e: except Exception as e:
logger.error(f"Error general de extracción de datos. {e}") logger.error(f"Error general de extracción de datos. {e}")
finally: finally:
...@@ -92,13 +92,13 @@ def save_procedure_json(json_path: str, task) -> None: ...@@ -92,13 +92,13 @@ def save_procedure_json(json_path: str, task) -> None:
logger.error(f"Error leyendo y guardando archivo descriptor de procedure. {e}") logger.error(f"Error leyendo y guardando archivo descriptor de procedure. {e}")
def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task): def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task, provider: str):
try: try:
if not prefix.endswith("/"): if not prefix.endswith("/"):
prefix += "/" prefix += "/"
key = prefix + filename key = prefix + filename
logger.info(f"EXTRAYENDO ARCHIVO DE CONTROL DESDE {key}") logger.info(f"EXTRAYENDO ARCHIVO DE CONTROL DESDE {key}")
control = get_file_from_key(conn_id, bucket, key) control = get_file_from_key(conn_id, bucket, key, provider)
if control: if control:
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore') str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data) data = StringIO(str_data)
...@@ -113,14 +113,14 @@ def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task) ...@@ -113,14 +113,14 @@ def extract_control(conn_id: str, bucket: str, prefix: str, filename: str, task)
def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str, def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str,
order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any], order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any],
**kwargs): provider: str, **kwargs):
try: try:
extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"], extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"],
control_params["filename"], kwargs['ti']) control_params["filename"], kwargs['ti'], provider)
save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti']) save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti'])
start_time = time.time() start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}") logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
scripts = get_files_from_prefix(conn_id, bucket, prefix) scripts = get_files_from_prefix(conn_id, bucket, prefix, provider)
scripts = update_sql_commands(scripts, label_tablename) scripts = update_sql_commands(scripts, label_tablename)
save_commands_to_xcom(scripts, kwargs['ti'], source_mask, transform_mask, procedure_mask, order_delimiter) save_commands_to_xcom(scripts, kwargs['ti'], source_mask, transform_mask, procedure_mask, order_delimiter)
logger.debug(f"Script cargados en Xcom: {scripts}") logger.debug(f"Script cargados en Xcom: {scripts}")
...@@ -163,7 +163,8 @@ def set_dag(): ...@@ -163,7 +163,8 @@ def set_dag():
op_kwargs={'conn_id': scripts_s3["connection_id"], 'bucket': scripts_s3["bucket"], op_kwargs={'conn_id': scripts_s3["connection_id"], 'bucket': scripts_s3["bucket"],
'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask, 'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask,
'procedure_mask': procedure_mask, 'order_delimiter': order_delimiter, 'procedure_mask': procedure_mask, 'order_delimiter': order_delimiter,
'label_tablename': conf["label_multiple_select"], 'control_params': control_s3}, 'label_tablename': conf["label_multiple_select"], 'control_params': control_s3,
'provider': conf["cloud_provider"]},
trigger_rule="all_success" trigger_rule="all_success"
) )
...@@ -184,17 +185,17 @@ def set_dag(): ...@@ -184,17 +185,17 @@ def set_dag():
# Creación de grupo de tasks para las extracciones # Creación de grupo de tasks para las extracciones
chunksize = conf["chunksize"] chunksize = conf["chunksize"]
timezone = conf["timezone"] timezone = conf["timezone"]
extractions = extraction(source_db, intern_db, timezone, control_s3, chunksize) extractions = extraction(source_db, intern_db, timezone, control_s3, conf["cloud_provider"], chunksize)
# Creación de grupo de tasks para las transformaciones # Creación de grupo de tasks para las transformaciones
transformations = transformation(intern_db, timezone, control_s3) transformations = transformation(intern_db, timezone, control_s3, conf["cloud_provider"])
# Creación de grupo de tasks para la generación y despliegue de archivos resultados # Creación de grupo de tasks para la generación y despliegue de archivos resultados
outputs_conf = conf["outputs"] outputs_conf = conf["outputs"]
result = generate_and_deploy_results(intern_db, outputs_conf, timezone, control_s3) result = generate_and_deploy_results(intern_db, outputs_conf, timezone, control_s3, conf["cloud_provider"])
# Creación de tasks de limpiadores # Creación de tasks de limpiadores
cleaners = cleaning(intern_db, control_s3) cleaners = cleaning(intern_db, control_s3, conf["cloud_provider"])
sensor_scripts >> script_extractor >> extractions >> transformations >> result >> cleaners sensor_scripts >> script_extractor >> extractions >> transformations >> result >> cleaners
return dag return dag
......
...@@ -234,7 +234,7 @@ def set_dag_1(): ...@@ -234,7 +234,7 @@ def set_dag_1():
from yaml.loader import SafeLoader from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml # Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags # En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/opt/airflow/dags/app_conf.yml" conf_path = "/root/airflow/dags/app_conf.yml"
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader) data = yaml.load(f, Loader=SafeLoader)
general_cnf = data["general"] general_cnf = data["general"]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment