Commit 0e214512 authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer-ca' into 'developer'

Developer ca

See merge request !1
parents e6dac30d d5c367db
env
\ No newline at end of file
general:
s3_parameters:
s3_conn_id: "bcom_tp_connection"
bucket: "prueba1234568"
dags:
dag1:
schedule: "@once"
period_pattern: '[a-zA-Z0-9]+([0-9]{4})(\-[0-9]{2})?\.[a-zA-Z]*'
csv_delimiter: ","
filters:
fields_omited: ["BCOM_ROW_IDENTIFIER", "BCOM_PROCESS_ID", "BCOM_FIELD_KEY", "BCOM_LNUMBER", "BCOM_ERROR_CODE",
"BCOM_ERROR_MESSAGE"]
tacom_drop_nulls_subset: [ "CD_FOLIO", "CD_CUENTA", "CD_PAQUETE", "TP_SERVICIO"]
promo_drop_nulls_subset: ["CUENTA", "NOMBRE_PRODUCTO", "POID_PRODUCT", "CD_PAQUETE"]
catalog_drop_nulls_subset: ["NOMBRE_PRODUCTO", "CD_PAQUETE"]
rela3pa2p_drop_nulls_subset: ["TRESP", "DOSP"]
relapoid_drop_nulls_subset: ["POID_PRODUCT", "CD_PAQUETE"]
relapaqs_drop_nulls_subset: ["COD_PAQ_INI", "COD_PAQ_FIN"]
not_promo_drop_nulls_subset: ["CD_PAQUETE"]
s3_parameters:
inputs:
prefix: "pruebas_qa"
tacom_pattern: "tacomventas_original*.txt"
promociones_pattern: "promociones_original*.txt"
outputs:
prefix: "prueba3/tacom_outputs"
tacom_output: "tacom_modified.csv"
tacom_delimiter: ","
promo_output: "promociones_modified.csv"
promo_delimiter: ","
procesed_prefix: "prueba3/procesed"
catalogo_promociones:
type: "INSUMO"
pattern: "catalogopromocion*.txt"
prefix: "pruebas_qa"
key_field: "NOMBRE_PRODUCTO"
value_field: "CD_PAQUETE"
delimiter: ","
relacion3pa2p:
type: "INSUMO"
pattern: "temporal_relacion3pa2p*.txt"
prefix: "pruebas_qa"
key_field: "TRESP"
value_field: "DOSP"
delimiter: ","
relacionpoidpaquete:
type: "INSUMO"
pattern: "temporal_relacion_Paquete*.txt"
prefix: "pruebas_qa"
key_field: "POID_PRODUCT"
value_field: "CD_PAQUETE"
delimiter: ","
relacion_paquetes:
type: "INSUMO"
pattern: "PAQUINIFIN*.txt"
prefix: ""
key_field: "COD_PAQ_INI"
value_field: "COD_PAQ_FIN"
delimiter: ","
no_promocion:
type: "INSUMO"
pattern: "PAQUETE*.txt"
prefix: ""
key_field: "CD_PAQUETE"
value_field: ""
delimiter: ","
import fnmatch
import datetime
from typing import Any, Dict, Set
import pytz
import re
from io import BytesIO, StringIO
import pandas as pd
from components.Utils import get_type_file
from enums.FileTypeEnum import FileTypeEnum
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import logging
logger = logging.getLogger()
def get_df_from_s3(conn: str, bucket: str, key: str, period: str, delimiter: str) -> Dict[str, Any]:
response = {'filename': "", 'df': None}
dataframe = None
try:
s3_data = get_data_from_s3(conn, bucket, key, period)
logger.info(f"ARCHIVO EXTRAIDO: {s3_data}")
if s3_data["filename"] == "":
raise Exception(f"No se encontró archivo para el key: {key} y periodo {period}")
response.update({'filename': s3_data["filename"]})
file_type = get_type_file(s3_data["filename"])
if file_type == FileTypeEnum.EXCEL:
dataframe = pd.read_excel(s3_data["data"], engine="openpyxl")
elif file_type == FileTypeEnum.OLD_EXCEL:
dataframe = pd.read_excel(s3_data["data"], engine="xlrd")
elif file_type == FileTypeEnum.TEXT or file_type == FileTypeEnum.CSV:
str_data = str(s3_data["data"].getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data)
dataframe = pd.read_csv(data, sep=delimiter)
response.update({'df': dataframe})
except Exception as e:
logger.error(f"Error trayendo y transformando a DataFrame desde S3 con periodo {period}. {e}")
return response
def get_data_from_s3(conn: str, bucket: str, key: str, period: str) -> Dict[str, Any]:
result = {'filename': '', 'data': BytesIO()}
utc = pytz.UTC
try:
if key.rfind("/") != -1:
prefix = key[:key.rfind("/")+1]
else:
prefix = ""
s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix)
# Colocar una fecha muy atrás como base
last_key = ("", datetime.datetime(2000, 1, 1, 0, 0, 0).replace(tzinfo=utc))
for file_key in files:
if fnmatch.fnmatch(file_key, key) and (file_key.find(period) != -1 or file_key.find(period.replace("-", "")) != -1):
file_date = s3_hook.get_key(file_key, bucket).meta.data
file_date = file_date["LastModified"]
if last_key[1] >= file_date:
continue
last_key = (file_key, file_date)
data = s3_hook.get_key(last_key[0], bucket)
data.download_fileobj(result["data"])
result["filename"] = last_key[0]
except Exception as e:
logger.error(f"Error trayendo datos desde S3 para el key {key} y periodo {period}. {e}")
return result
def search_periods_from_key_s3(conn: str, bucket: str, key: str, pattern: str) -> Set[str]:
periods = set()
try:
if key.rfind("/") != -1:
prefix = key[:key.rfind("/") + 1]
else:
prefix = ""
s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix)
for file in files:
if not re.search(pattern, file):
continue
period = file[file.rfind(".")-7:file.rfind(".")]
if period.find("-") == -1:
period = period[1:5] + "-" + period[5:]
periods.add(period)
except Exception as e:
logger.error(f"Error buscando periodos disponibles en los archivos. key: {key}. {e}")
return set(periods)
def save_df_to_s3(df: pd.DataFrame, conn: str, bucket: str, key: str, delimiter: str = ","):
try:
logger.info(f"SUBIENDO A NUBE KEY {key}")
file_type = get_type_file(key)
s3_hook = S3Hook(conn)
if file_type == FileTypeEnum.EXCEL or file_type == FileTypeEnum.OLD_EXCEL:
with BytesIO() as buffer:
with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer:
df.to_excel(writer, index=None)
s3_hook.load_bytes(buffer.getvalue(), key, bucket, True)
elif file_type == FileTypeEnum.CSV or file_type == FileTypeEnum.TEXT:
csv_buffer = BytesIO()
df.to_csv(csv_buffer, header=True, index=False, sep=delimiter, na_rep='None')
csv_buffer.seek(0)
s3_hook.load_bytes(csv_buffer.getvalue(), key, bucket, True)
except Exception as e:
logger.error(f"Error guardando archivos a S3. key: {key}. {e}")
def move_object_s3(conn: str, bucket: str, source_key: str, output_key: str):
try:
filename = source_key[source_key.rfind("/")+1:]
output_key += filename
s3_hook = S3Hook(conn)
s3_hook.copy_object(source_key, output_key, bucket, bucket)
s3_hook.delete_objects(bucket, source_key)
except Exception as e:
logger.error(f"Error moviendo archivo desde {source_key} hacia {output_key} en bucket {bucket}. {e}")
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
import logging
logger = logging.getLogger()
POKE_INTERVAL = 5
TIMEOUT = 60*1
def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3KeySensor:
s3_sensor = None
try:
s3_sensor = S3KeySensor(
task_id=task_id,
bucket_key=key,
bucket_name=bucket,
wildcard_match=True,
aws_conn_id=connection,
verify=True,
poke_interval=POKE_INTERVAL,
timeout=TIMEOUT
)
except Exception as e:
logger.error(f"Error creando Sensor S3. {e}")
return s3_sensor
from typing import List, Any, Dict
import pandas as pd
from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from enums.FileTypeEnum import FileTypeEnum
import logging
logger = logging.getLogger()
def get_type_file(key: str) -> FileTypeEnum:
result = FileTypeEnum.EXCEL
try:
file_type_sufix = key.rfind(".")
file_type = key[file_type_sufix+1:]
result = FileTypeEnum(file_type)
except Exception as e:
logger.error(f"Error obteniedo el tipo de archivo de {key}. {e}")
return result
def get_modified_prefix(prefix: str) -> str:
try:
if prefix == "/":
prefix = ""
elif not prefix.endswith("/") and prefix != "":
prefix += "/"
except Exception as e:
logger.error(f"Error modificando prefijo de {prefix}. {e}")
finally:
return prefix
def add_period_to_sufix(name: str, period: str) -> str:
result = name
try:
position = name.rfind(".")
result = name[:position] + period + name[position:]
except Exception as e:
logger.error(f"Error añadiendo periodo al nombre del archivo {name}. {e}")
return result
def remove_invalid_rows(df: pd.DataFrame, valid_cols: List[str]) -> pd.DataFrame:
try:
df = df.dropna(how='all', subset=valid_cols)
except Exception as e:
logger.error(f"Error removiendo filas inválidas. {e}")
finally:
return df
def remove_fields(df: pd.DataFrame, fields_omitted: List[str]) -> pd.DataFrame:
try:
if len(fields_omitted) > 0:
df = df.loc[:, ~df.columns.isin(fields_omitted)]
except Exception as e:
logger.error(f"Error removiendo columnas. {e}")
finally:
return df
def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], catalog_name: str,
default_prefix: str) -> Dict[str, Any]:
try:
catalog = data[catalog_name]
catalog_type = catalog["type"]
catalog_prefix = catalog["prefix"]
if catalog_type == CatalogConfigurationEnum.CATALOGO.value:
catalog_prefix = get_modified_prefix(catalog_prefix)
else:
catalog_prefix = default_prefix
s3_catalog = catalog_prefix + catalog["pattern"]
data_dict.update({'s3_'+catalog_name: s3_catalog, catalog_name+'_key': catalog["key_field"],
catalog_name+'_value': catalog["value_field"]})
if "delimiter" in catalog.keys():
data_dict.update({catalog_name+'_delimiter': catalog["delimiter"]})
except Exception as e:
logger.error(f"Error actualizando dict de catalogos. {e}")
finally:
return data_dict
from datetime import datetime, timedelta
from typing import Any, Dict
import time
import pandas as pd
import numpy as np
from components.S3Route import get_df_from_s3, search_periods_from_key_s3, save_df_to_s3, move_object_s3
from components.Sensor import create_s3_sensor
from components.Utils import get_modified_prefix, add_period_to_sufix, remove_invalid_rows, remove_fields, \
update_dict_with_catalogs
from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACION_TACOMVENTAS_PROMOCIONESRESIDENCIAL"
DEFAULT_ARGS = {
'owner': 'airflow',
"start_date": datetime(2023, 5, 25, 22, 9),
'depends_on_past': False,
'email': 'caguirre@bytesw.com',
'retries': 1,
'retries_delay': timedelta(minutes=3),
'email_on_retry': False,
'email_on_failure': False
}
def dag1_id1(promo: pd.DataFrame, catalog_promo: pd.DataFrame, key_field: str, value_field: str) -> pd.DataFrame:
try:
catalog = catalog_promo.values
for item in catalog:
promo[value_field] = np.where(promo[key_field].str.contains(item[0]), item[1], promo[value_field])
except Exception as e:
logger.error(f"Error DAG1_ID1. {e}")
return promo
def dag1_id2(promo: pd.DataFrame, relation_poid: pd.DataFrame, key_field: str, value_field: str) -> pd.DataFrame:
try:
relation_poid = relation_poid.rename(columns={value_field: 'NUEVO_'+value_field})
relation_poid = relation_poid.drop_duplicates(subset=[key_field])
promo = promo.merge(relation_poid, 'left', key_field)
promo[value_field] = np.where(promo['NUEVO_'+value_field].notna(), promo['NUEVO_'+value_field], promo[value_field])
promo = promo.drop('NUEVO_'+value_field, axis=1)
except Exception as e:
logger.error(f"Error DAG1_ID2. {e}")
return promo
def dag1_id3(tacom: pd.DataFrame, promo: pd.DataFrame) -> pd.DataFrame:
result = pd.DataFrame()
try:
promo["CUENTA"] = promo["CUENTA"].astype(int, errors='ignore')
promo["CD_PAQUETE_PROMO"] = promo["CD_PAQUETE"].astype(int, errors='ignore')
promo.drop("CD_PAQUETE", axis=1, inplace=True)
promo = promo.drop_duplicates(["CUENTA", "CD_PAQUETE_PROMO"])
result = tacom.merge(promo, how='left', left_on=["CD_CUENTA", "CD_PAQUETE"], right_on=["CUENTA", "CD_PAQUETE_PROMO"])
result["CD_PAQUETE"] = result["CD_PAQUETE"].astype(str)
no_consider = "Adicional|Soundbox|SOUNDBOX"
aa = result[result["CD_PAQUETE"].str.len() <= 5]
result["PROMOCION"] = np.where((result["CD_PAQUETE"].isna()) | (result["CD_PAQUETE"] == "None") |
(result["CD_PAQUETE"] == "nan"), None,
np.where((result["CD_PAQUETE"].notna()) & (result["CD_PAQUETE"].str.len() <= 5), "3P a 2P",
np.where((result["NOMBRE_PRODUCTO"].str.contains("TV", na=False)) &
(~result["NOMBRE_PRODUCTO"].str.contains(no_consider, na=False)), "Parrilla",
np.where((result["NOMBRE_PRODUCTO"].str.contains("CANALES", na=False)) &
(~result["NOMBRE_PRODUCTO"].str.contains(no_consider, na=False)), "Parrilla",
np.where(result["NOMBRE_PRODUCTO"].str.contains("HBO MAX", na=False), "Promocion HBO",
np.where(result["NOMBRE_PRODUCTO"].str.contains("PAQUETE HBO", na=False), "Promocion HBO",
np.where(result["NOMBRE_PRODUCTO"].str.contains("STAR PREMIUM", na=False),
"Promocion STAR PREMIUM", "PROMOCION")))))))
result["CD_PAQUETE_PROMO"] = np.where((result["CD_PAQUETE_PROMO"] == 'nan') |
(result["CD_PAQUETE_PROMO"] == 'None'), None, result["CD_PAQUETE"])
except Exception as e:
logger.error(f"Error DAG1_ID3. {e}")
finally:
return result
def dag1_id4(df: pd.DataFrame, df_promo: pd.DataFrame, key_field: str, value_field: str) -> pd.DataFrame:
try:
df["CD_PAQUETE"] = df["CD_PAQUETE"].astype(str, errors='ignore')
df_promo[key_field] = df_promo[key_field].astype(str, errors='ignore')
df_promo[value_field] = df_promo[value_field].astype(str, errors='ignore')
df = df.merge(df_promo, how='outer', left_on="CD_PAQUETE", right_on=key_field)
df = df.dropna(how='all', subset=["CD_EMPRESA", "CD_FOLIO", "CD_CUENTA"])
df["CD_PAQUETE"] = np.where((df["PROMOCION"] == "3P a 2P") & (df[key_field].notna()), df[value_field], df["CD_PAQUETE"])
df = df.drop([key_field, value_field], axis=1)
except Exception as e:
logger.error(f"Error DAG1_ID4. {e}")
finally:
return df
def dag1_id5(df: pd.DataFrame, relation: pd.DataFrame, key_field: str, value_field: str) -> pd.DataFrame:
try:
relation = relation.astype(str)
df = df.merge(relation, how='left', left_on="CD_PAQUETE", right_on=key_field)
df["CD_PAQUETE"] = np.where(df[value_field].notna(), df[value_field], df["CD_PAQUETE"])
except Exception as e:
logger.error(f"Error DAG1_ID5. {e}")
finally:
return df
def dag1_id6(df: pd.DataFrame, notpromo: pd.DataFrame, key_field: str) -> pd.DataFrame:
try:
df["NRO_PAQUETE"] = 0
notpromo = notpromo.astype(str)
not_promo_values = notpromo[key_field].tolist()
df_without_paq = df[~df["CD_PAQUETE"].isin(not_promo_values)]
df_with_paq = df[df["CD_PAQUETE"].isin(not_promo_values)]
df_with_paq["NRO_PAQUETE"] = df_with_paq.groupby(["CD_FOLIO", "CD_PAQUETE"]).cumcount() + 1
df = pd.concat([df_with_paq, df_without_paq], ignore_index=True)
except Exception as e:
logger.error(f"Error DAG1_ID6. {e}")
finally:
return df
def dag1_id7(df: pd.DataFrame) -> pd.DataFrame:
try:
df["PROMOCION"] = np.where((df["TP_SERVICIO"] == 2) & (df["NRO_PAQUETE"] > 1), None, df["PROMOCION"])
except Exception as e:
logger.error(f"Error DAG1_ID7. {e}")
finally:
return df
def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: Dict[str, Any], period_pattern: str,
delimiter: str, outputs: Dict[str, Any], procesed_prefix: str, filters: Dict[str, Any]):
start_time = time.time()
logger.debug(f"DICCIONARIO DE CATALOGOS: {catalogs}")
periods = search_periods_from_key_s3(conn, bucket, tacom_key, period_pattern)
omited_cols = filters["fields_omited"]
for period in periods:
logger.debug(period)
tacom = get_df_from_s3(conn, bucket, tacom_key, period, delimiter)
tacom_df = remove_invalid_rows(tacom['df'], filters["tacom_drop_nulls_subset"])
tacom_df = remove_fields(tacom_df, omited_cols)
if isinstance(tacom_df, type(None)):
logger.error(f"INSUMO TACOM NO ENCONTRADO PARA EL PERIODO {period}")
continue
promo = get_df_from_s3(conn, bucket, promo_key, period, delimiter)
promo_df = remove_invalid_rows(promo['df'], filters["promo_drop_nulls_subset"])
promo_df = remove_fields(promo_df, omited_cols)
if isinstance(promo_df, type(None)):
logger.error(f"INSUMO PROMOCIONES NO ENCONTRADO PARA EL PERIODO {period}")
continue
if "catalogo_promociones_delimiter" in catalogs.keys():
delimiter = catalogs["catalogo_promociones_delimiter"]
promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_catalogo_promociones"], period, delimiter)
promo_catalog_df = remove_invalid_rows(promo_catalog['df'], filters["catalog_drop_nulls_subset"])
promo_catalog_df = remove_fields(promo_catalog_df, omited_cols)
if isinstance(promo_catalog_df, type(None)):
logger.error(f"INSUMO PROMOCIONES CATALOGO NO ENCONTRADO PARA EL PERIODO {period}")
continue
if "relacionpoidpaquete_delimiter" in catalogs.keys():
delimiter = catalogs["relacionpoidpaquete_delimiter"]
relationpoid = get_df_from_s3(conn, bucket, catalogs["s3_relacionpoidpaquete"], period, delimiter)
relationpoid_df = remove_invalid_rows(relationpoid['df'], filters["relapoid_drop_nulls_subset"])
relationpoid_df = remove_fields(relationpoid_df, omited_cols)
if isinstance(relationpoid_df, type(None)):
logger.error(f"INSUMO RELACION POID NO ENCONTRADO PARA EL PERIODO {period}")
continue
if "relacion3pa2p_delimiter" in catalogs.keys():
delimiter = catalogs["relacion3pa2p_delimiter"]
relation3a2p = get_df_from_s3(conn, bucket, catalogs["s3_relacion3pa2p"], period, delimiter)
relation3a2p_df = remove_invalid_rows(relation3a2p['df'], filters["rela3pa2p_drop_nulls_subset"])
relation3a2p_df = remove_fields(relation3a2p_df, omited_cols)
if isinstance(relation3a2p_df, type(None)):
logger.error(f"INSUMO RELACION 3A2P NO ENCONTRADO PARA EL PERIODO {period}")
continue
if "relacion_paquetes_delimiter" in catalogs.keys():
delimiter = catalogs["relacion_paquetes_delimiter"]
relapaq_catalog = get_df_from_s3(conn, bucket, catalogs["s3_relacion_paquetes"], period, delimiter)
relapaq_df = remove_invalid_rows(relapaq_catalog['df'], filters["relapaqs_drop_nulls_subset"])
relapaq_df = remove_fields(relapaq_df, omited_cols)
if isinstance(relapaq_df, type(None)):
logger.error(f"INSUMO RELACION PAQUETE INICIAL Y FINAL NO ENCONTRADO PARA EL PERIODO {period}")
continue
if "no_promocion_delimiter" in catalogs.keys():
delimiter = catalogs["no_promocion_delimiter"]
not_promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_no_promocion"], period, delimiter)
not_promo_df = remove_invalid_rows(not_promo_catalog['df'], filters["not_promo_drop_nulls_subset"])
not_promo_df = remove_fields(not_promo_df, omited_cols)
if isinstance(not_promo_df, type(None)):
logger.error(f"INSUMO PAQUETES SIN PROMOCION NO ENCONTRADO PARA EL PERIODO {period}")
continue
logger.info(f"Ejecutando proceso para el periodo: {period}")
promo_df = dag1_id1(promo_df, promo_catalog_df, catalogs["catalogo_promociones_key"],
catalogs["catalogo_promociones_value"])
promo_df = dag1_id2(promo_df, relationpoid_df, catalogs["relacionpoidpaquete_key"],
catalogs["relacionpoidpaquete_value"])
result = dag1_id3(tacom_df, promo_df)
result = dag1_id4(result, relation3a2p_df, catalogs["relacion3pa2p_key"], catalogs["relacion3pa2p_value"])
result = dag1_id5(result, relapaq_df, catalogs["relacion_paquetes_key"], catalogs["relacion_paquetes_value"])
result = dag1_id6(result, not_promo_df, catalogs["no_promocion_key"])
result = dag1_id7(result)
final_columns = list(tacom_df.columns)
result = result[final_columns]
# Subir los resultados al S3
tacom_output = add_period_to_sufix(outputs["tacom_output_path"], period)
save_df_to_s3(result, conn, bucket, tacom_output, outputs["tacom_delimiter"])
promo_output = add_period_to_sufix(outputs["promo_output_path"], period)
promo_df.rename(columns={'CD_PAQUETE_PROMO': 'CD_PAQUETE'}, inplace=True)
save_df_to_s3(promo_df, conn, bucket, promo_output, outputs["promo_delimiter"])
# Mover TODOS LOS INSUMOS USADOS
move_object_s3(conn, bucket, tacom['filename'], procesed_prefix)
move_object_s3(conn, bucket, promo['filename'], procesed_prefix)
move_object_s3(conn, bucket, promo_catalog['filename'], procesed_prefix)
move_object_s3(conn, bucket, relationpoid['filename'], procesed_prefix)
move_object_s3(conn, bucket, relation3a2p['filename'], procesed_prefix)
move_object_s3(conn, bucket, relapaq_catalog['filename'], procesed_prefix)
move_object_s3(conn, bucket, not_promo_catalog['filename'], procesed_prefix)
logger.info("FINALIZADO PROCESO 1")
logger.info(f"--- {time.time() - start_time} seconds --- TIEMPO DE EJECUCIÓN DEL PROCESO 1")
def set_dag_1():
""" DAG that execute the process with TACOMVENTAS AND PROMOCIONES_RESIDENCIAL"""
import yaml
from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/opt/airflow/dags/app_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
general_cnf = data["general"]
conf = data["dags"]["dag1"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que extrae y transforma",
schedule_interval=conf["schedule"], tags=["DAG 1"], catchup=True) as dag:
dag1_inputs_s3_params = conf["s3_parameters"]["inputs"]
s3_conf = general_cnf["s3_parameters"]
s3_prefix = get_modified_prefix(dag1_inputs_s3_params["prefix"])
s3_tacom = s3_prefix + dag1_inputs_s3_params["tacom_pattern"]
s3_promo = s3_prefix + dag1_inputs_s3_params["promociones_pattern"]
catalogs_dict = {}
catalogs_dict = update_dict_with_catalogs(catalogs_dict, conf, "catalogo_promociones", s3_prefix)
catalogs_dict = update_dict_with_catalogs(catalogs_dict, conf, "relacion3pa2p", s3_prefix)
catalogs_dict = update_dict_with_catalogs(catalogs_dict, conf, "relacionpoidpaquete", s3_prefix)
catalogs_dict = update_dict_with_catalogs(catalogs_dict, conf, "relacion_paquetes", s3_prefix)
catalogs_dict = update_dict_with_catalogs(catalogs_dict, conf, "no_promocion", s3_prefix)
# Define the sensor to verify if data exists or have been updated
s3_sensor_tacom = create_s3_sensor("S3_sensor_tacom_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
s3_tacom)
s3_sensor_promo = create_s3_sensor("S3_sensor_promo_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
s3_promo)
s3_sensor_promo_catalog = create_s3_sensor("S3_sensor_promo_catalog_task", s3_conf["s3_conn_id"],
s3_conf["bucket"], catalogs_dict["s3_catalogo_promociones"])
s3_sensor_3a2p = create_s3_sensor("S3_sensor_3a2p_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacion3pa2p"])
s3_sensor_poid = create_s3_sensor("S3_sensor_poid_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacionpoidpaquete"])
s3_sensor_paq = create_s3_sensor("S3_sensor_paq_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacion_paquetes"])
s3_sensor_notpromo = create_s3_sensor("S3_sensor_notpromo_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_no_promocion"])
outputs = conf["s3_parameters"]["outputs"]
output_prefix = outputs["prefix"]
output_prefix = get_modified_prefix(output_prefix)
output_tacom_path = output_prefix + outputs["tacom_output"]
output_tacom_sep = ","
if "tacom_delimiter" in outputs.keys():
output_tacom_sep = outputs["tacom_delimiter"]
output_promo_path = output_prefix + outputs["promo_output"]
output_promo_sep = ","
if "promo_delimiter" in outputs.keys():
output_promo_sep = outputs["promo_delimiter"]
outputs = {'tacom_output_path': output_tacom_path, 'tacom_delimiter': output_tacom_sep,
'promo_output_path': output_promo_path, 'promo_delimiter': output_promo_sep}
procesed_prefix = conf["s3_parameters"]["procesed_prefix"]
if not procesed_prefix.endswith("/") and procesed_prefix != "":
procesed_prefix += "/"
filters = conf["filters"]
transformation = PythonOperator(
task_id="ETL-DAG1",
python_callable=etl_dag1,
op_kwargs={'conn': s3_conf["s3_conn_id"], 'bucket': s3_conf["bucket"],
'tacom_key': s3_tacom, 'promo_key': s3_promo, 'catalogs': catalogs_dict,
'period_pattern': conf["period_pattern"], 'delimiter': conf["csv_delimiter"],
'outputs': outputs, 'procesed_prefix': procesed_prefix, 'filters': filters},
trigger_rule="all_success"
)
(s3_sensor_tacom, s3_sensor_promo, s3_sensor_promo_catalog, s3_sensor_3a2p,
s3_sensor_poid, s3_sensor_paq, s3_sensor_notpromo) >> transformation
return dag
globals()["0"] = set_dag_1()
from enum import Enum
class CatalogConfigurationEnum(Enum):
CATALOGO = "CATALOGO"
INSUMO = "INSUMO"
from enum import Enum
class FileTypeEnum(Enum):
TEXT = "txt"
CSV = "csv"
EXCEL = "xlsx"
OLD_EXCEL = "xls"
FROM apache/airflow:2.5.3
COPY requirements.txt /
RUN pip install --no-cache-dir "apache-airflow==2.5.3" "apache-airflow[kubernetes]==2.5.3" -r /requirements.txt
\ No newline at end of file
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-envvars-configmap
data:
# The conf below is necessary because of a typo in the config on docker-airflow image:
# https://github.com/puckel/docker-airflow/blob/bed777970caa3e555ef618d84be07404438c27e3/config/airflow.cfg#L934
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: '30'
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: America/Lima
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}'
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.1"
AIRFLOW__KUBERNETES__DAGS_VOLUME_HOST: /mnt/airflow/dags
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF: airflow-envvars-configmap
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE: /opt/airflow/templates/pod_template.yaml
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: admin
_AIRFLOW_WWW_USER_PASSWORD: admin
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: pods-permissions
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "delete"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: pods-permissions
subjects:
- kind: ServiceAccount
name: default
namespace: default
roleRef:
kind: ClusterRole
name: pods-permissions
apiGroup: rbac.authorization.k8s.io
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-scheduler
labels:
app: airflow-k8s
spec:
selector:
matchLabels:
app: airflow-scheduler
replicas: 1
template:
metadata:
labels:
app: airflow-scheduler
spec:
containers:
- name: airflow-scheduler
image: cristianfernando/airflow_custom:0.0.1
args: ["scheduler"]
envFrom:
- configMapRef:
name: airflow-envvars-configmap
resources:
limits:
memory: "512Mi"
# cpu: "100"
volumeMounts:
- name: dags-host-volume
mountPath: /opt/airflow/dags
- name: logs-persistent-storage
mountPath: /opt/airflow/logs
- name: pods-templates
mountPath: /opt/airflow/templates
volumes:
- name: dags-host-volume
hostPath:
path: /opt/airflow/dags/dags/
type: Directory
- name: pods-templates
hostPath:
path: /opt/airflow/templates/
type: Directory
- name: logs-persistent-storage
persistentVolumeClaim:
claimName: airflow-logs-pvc
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-webserver
labels:
app: airflow-k8s
spec:
selector:
matchLabels:
app: airflow-webserver
replicas: 1
template:
metadata:
labels:
app: airflow-webserver
spec:
containers:
- name: airflow-webserver
image: cristianfernando/airflow_custom:0.0.1
args: ["webserver"]
envFrom:
- configMapRef:
name: airflow-envvars-configmap
resources:
limits:
memory: "512Mi"
# cpu: "100"
ports:
- containerPort: 8080
volumeMounts:
- name: dags-host-volume
mountPath: /opt/airflow/dags/
- name: logs-persistent-storage
mountPath: /opt/airflow/logs
volumes:
- name: dags-host-volume
hostPath:
path: /opt/airflow/dags/dags/
type: Directory
- name: logs-persistent-storage
persistentVolumeClaim:
claimName: airflow-logs-pvc
apiVersion: v1
kind: Service
metadata:
name: airflow-webserver
labels:
app: airflow-k8s
spec:
type: NodePort
selector:
app: airflow-webserver
ports:
- name: web
protocol: TCP
port: 8081
targetPort: 8080
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs-pvc
labels:
app: airflow-k8s
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 2Gi
storageClassName: standard
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- args: [ ]
command: [ ]
env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
- name: DB_HOST
value: postgres
- name: DB_DATABASE
value: airflow
- name: DB_USER
value: airflow
- name: DB_PASSWORD
value: airflow
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
value: postgresql+psycopg2://airflow:airflow@postgres/airflow
- name: AIRFLOW__LOGGING__LOGGING_LEVEL
value: INFO
image: dumy-image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- name: dags-host-volume
mountPath: /opt/airflow/dags
- name: logs-persistent-storage
mountPath: /opt/airflow/logs
hostNetwork: false
restartPolicy: Never
securityContext:
runAsUser: 50000
nodeSelector: { }
affinity: { }
tolerations: [ ]
volumes:
- name: dags-host-volume
hostPath:
path: /opt/airflow/dags/dags/
type: Directory
- name: logs-persistent-storage
persistentVolumeClaim:
claimName: airflow-logs-pvc
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres
spec:
selector:
matchLabels:
app: postgres
replicas: 1
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:12
resources:
limits:
memory: 128Mi
cpu: 500m
ports:
- containerPort: 5432
env:
- name: POSTGRES_PASSWORD
value: airflow
- name: POSTGRES_USER
value: airflow
- name: POSTGRES_DB
value: airflow
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: postgres
spec:
selector:
app: postgres
ports:
- port: 5432
targetPort: 5432
apache-airflow[kubernetes]==2.5.3
openpyxl==3.1.2
XlsxWriter==3.1.2
kubectl apply -f logs-persistenvolumeclaim.yaml
kubectl apply -f airflow-rbac.yaml
kubectl apply -f postgres-deployment.yaml
kubectl apply -f postgres-service.yaml
kubectl apply -f airflow-envvars-configmap.yaml
kubectl apply -f airflow-webserver-deployment.yaml
kubectl apply -f airflow-webserver-service.yaml
kubectl apply -f airflow-scheduler-deployment.yaml
kubectl apply -f sync-dags-deployment.yaml
kubectl delete -f airflow-rbac.yaml
kubectl delete -f postgres-service.yaml
kubectl delete -f postgres-deployment.yaml
kubectl delete -f airflow-envvars-configmap.yaml
kubectl delete -f airflow-webserver-service.yaml
kubectl delete -f airflow-webserver-deployment.yaml
kubectl delete -f airflow-scheduler-deployment.yaml
kubectl delete -f logs-persistenvolumeclaim.yaml
kubectl delete -f sync-dags-deployment.yaml
\ No newline at end of file
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-sync-dags
spec:
selector:
matchLabels:
app: airflow-sync-dags
template:
metadata:
labels:
app: airflow-sync-dags
spec:
containers:
- args:
- while true; aws s3 sync --exact-timestamps --delete 's3://prueba1234568/dags' '/dags'; do sleep 30; done;
command:
- /bin/bash
- -c
- --
name: sync-dags
image: amazon/aws-cli:2.1.34
env:
- name: AWS_ACCESS_KEY_ID
value: AKIAQAAMXO3Z4BHNKEIE
- name: AWS_SECRET_ACCESS_KEY
value: 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
volumeMounts:
- name: dags-host-volume
mountPath: /dags
volumes:
- name: dags-host-volume
hostPath:
path: /opt/airflow/dags/dags/
type: Directory
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment