Commit b9c35909 authored by Cristian Aguirre's avatar Cristian Aguirre

Update 15-06-23. Delete period functionality. Add group_input_interval_days...

Update 15-06-23. Delete period functionality. Add group_input_interval_days functionality where there is an allowed interval (days) between inputs
parent 0e6ab3de
...@@ -6,8 +6,8 @@ general: ...@@ -6,8 +6,8 @@ general:
dags: dags:
dag1: dag1:
schedule: "0 */2 * * *" schedule: "@once"
period_pattern: '[a-zA-Z0-9]+([0-9]{4})(\-[0-9]{2})?\.[a-zA-Z]*' group_input_interval_days: '7'
csv_delimiter: ";" csv_delimiter: ";"
filters: filters:
fields_omited: ["BCOM_ROW_IDENTIFIER", "BCOM_PROCESS_ID", "BCOM_FIELD_KEY", "BCOM_LNUMBER", "BCOM_ERROR_CODE", fields_omited: ["BCOM_ROW_IDENTIFIER", "BCOM_PROCESS_ID", "BCOM_FIELD_KEY", "BCOM_LNUMBER", "BCOM_ERROR_CODE",
......
import fnmatch import fnmatch
import datetime import datetime
from typing import Any, Dict, Set from typing import Any, Dict
import pytz import pytz
import re
from io import BytesIO, StringIO from io import BytesIO, StringIO
import pandas as pd import pandas as pd
...@@ -16,14 +15,15 @@ import logging ...@@ -16,14 +15,15 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
def get_df_from_s3(conn: str, bucket: str, key: str, period: str, delimiter: str) -> Dict[str, Any]: def get_df_from_s3(conn: str, bucket: str, key: str, delimiter: str, base_date: datetime.date,
interval: str) -> Dict[str, Any]:
response = {'filename': "", 'df': None} response = {'filename': "", 'df': None}
dataframe = None dataframe = None
try: try:
s3_data = get_data_from_s3(conn, bucket, key, period) s3_data = get_data_from_s3(conn, bucket, key, base_date, interval)
logger.info(f"ARCHIVO EXTRAIDO: {s3_data}") logger.info(f"ARCHIVO EXTRAIDO: {s3_data}")
if s3_data["filename"] == "": if s3_data["filename"] == "":
raise Exception(f"No se encontró archivo para el key: {key} y periodo {period}") raise Exception(f"No se encontró archivo para el key: {key} y fecha base {base_date} en intervalo {interval}")
response.update({'filename': s3_data["filename"]}) response.update({'filename': s3_data["filename"]})
file_type = get_type_file(s3_data["filename"]) file_type = get_type_file(s3_data["filename"])
if file_type == FileTypeEnum.EXCEL: if file_type == FileTypeEnum.EXCEL:
...@@ -36,11 +36,11 @@ def get_df_from_s3(conn: str, bucket: str, key: str, period: str, delimiter: str ...@@ -36,11 +36,11 @@ def get_df_from_s3(conn: str, bucket: str, key: str, period: str, delimiter: str
dataframe = pd.read_csv(data, sep=delimiter, dtype='object') dataframe = pd.read_csv(data, sep=delimiter, dtype='object')
response.update({'df': dataframe}) response.update({'df': dataframe})
except Exception as e: except Exception as e:
logger.error(f"Error trayendo y transformando a DataFrame desde S3 con periodo {period}. {e}") logger.error(f"Error trayendo y transformando a DataFrame desde S3. {e}")
return response return response
def get_data_from_s3(conn: str, bucket: str, key: str, period: str) -> Dict[str, Any]: def get_data_from_s3(conn: str, bucket: str, key: str, base_date: datetime.date, interval: str) -> Dict[str, Any]:
result = {'filename': '', 'data': BytesIO()} result = {'filename': '', 'data': BytesIO()}
utc = pytz.UTC utc = pytz.UTC
try: try:
...@@ -50,25 +50,26 @@ def get_data_from_s3(conn: str, bucket: str, key: str, period: str) -> Dict[str, ...@@ -50,25 +50,26 @@ def get_data_from_s3(conn: str, bucket: str, key: str, period: str) -> Dict[str,
prefix = "" prefix = ""
s3_hook = S3Hook(conn) s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix) files = s3_hook.list_keys(bucket, prefix)
last_key = ("", base_date)
# Colocar una fecha muy atrás como base # Colocar una fecha muy atrás como base
last_key = ("", datetime.datetime(2000, 1, 1, 0, 0, 0).replace(tzinfo=utc))
for file_key in files: for file_key in files:
if fnmatch.fnmatch(file_key, key) and (file_key.find(period) != -1 or file_key.find(period.replace("-", "")) != -1): if fnmatch.fnmatch(file_key, key):
file_date = s3_hook.get_key(file_key, bucket).meta.data file_date = s3_hook.get_key(file_key, bucket).meta.data
file_date = file_date["LastModified"] file_date = file_date["LastModified"]
if last_key[1] >= file_date: if int(interval) - abs((file_date - last_key[1]).days) >= 0:
continue
last_key = (file_key, file_date) last_key = (file_key, file_date)
data = s3_hook.get_key(last_key[0], bucket) data = s3_hook.get_key(last_key[0], bucket)
data.download_fileobj(result["data"]) data.download_fileobj(result["data"])
result["filename"] = last_key[0] result["filename"] = last_key[0]
except Exception as e: except Exception as e:
logger.error(f"Error trayendo datos desde S3 para el key {key} y periodo {period}. {e}") logger.error(f"Error trayendo datos desde S3 para el key {key}. {e}")
return result return result
def search_periods_from_key_s3(conn: str, bucket: str, key: str, pattern: str) -> Set[str]: def get_base_date(conn: str, bucket: str, key: str) -> datetime.date:
periods = set() utc = pytz.UTC
# Colocar una fecha muy atrás como base
last_date = datetime.datetime(2000, 1, 1, 0, 0, 0).replace(tzinfo=utc)
try: try:
if key.rfind("/") != -1: if key.rfind("/") != -1:
prefix = key[:key.rfind("/") + 1] prefix = key[:key.rfind("/") + 1]
...@@ -76,19 +77,17 @@ def search_periods_from_key_s3(conn: str, bucket: str, key: str, pattern: str) - ...@@ -76,19 +77,17 @@ def search_periods_from_key_s3(conn: str, bucket: str, key: str, pattern: str) -
prefix = "" prefix = ""
s3_hook = S3Hook(conn) s3_hook = S3Hook(conn)
files = s3_hook.list_keys(bucket, prefix) files = s3_hook.list_keys(bucket, prefix)
for file in files: for file_key in files:
if not re.search(pattern, file): if fnmatch.fnmatch(file_key, key):
file_date = s3_hook.get_key(file_key, bucket).meta.data
file_date = file_date["LastModified"]
if last_date >= file_date:
continue continue
if file[file.rfind(".")-6:file.rfind(".")].isdigit(): last_date = file_date
period = file[file.rfind(".")-6:file.rfind(".")] logger.debug(f"Fecha base desde {key} : {last_date}")
else:
period = file[file.rfind(".")-7:file.rfind(".")]
if period.find("-") == -1:
period = period[1:5] + "-" + period[5:]
periods.add(period)
except Exception as e: except Exception as e:
logger.error(f"Error buscando periodos disponibles en los archivos. key: {key}. {e}") logger.error(f"Error buscando archivo base para tener la fecha base. key: {key}. {e}")
return set(periods) return last_date
def save_df_to_s3(df: pd.DataFrame, conn: str, bucket: str, key: str, delimiter: str = ","): def save_df_to_s3(df: pd.DataFrame, conn: str, bucket: str, key: str, delimiter: str = ","):
......
...@@ -5,7 +5,7 @@ import time ...@@ -5,7 +5,7 @@ import time
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from components.S3Route import get_df_from_s3, search_periods_from_key_s3, save_df_to_s3, move_object_s3 from components.S3Route import get_df_from_s3, get_base_date, save_df_to_s3, move_object_s3
from components.Sensor import create_s3_sensor from components.Sensor import create_s3_sensor
from components.Utils import get_modified_prefix, add_period_to_sufix, remove_invalid_rows, remove_fields, \ from components.Utils import get_modified_prefix, add_period_to_sufix, remove_invalid_rows, remove_fields, \
update_dict_with_catalogs update_dict_with_catalogs
...@@ -137,75 +137,66 @@ def dag1_id7(df: pd.DataFrame, notpromo: pd.DataFrame, key_field: str) -> pd.Dat ...@@ -137,75 +137,66 @@ def dag1_id7(df: pd.DataFrame, notpromo: pd.DataFrame, key_field: str) -> pd.Dat
return df return df
def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: Dict[str, Any], period_pattern: str, def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: Dict[str, Any],
delimiter: str, outputs: Dict[str, Any], procesed_prefix: str, filters: Dict[str, Any]): delimiter: str, interval: str, outputs: Dict[str, Any], procesed_prefix: str, filters: Dict[str, Any]):
start_time = time.time() start_time = time.time()
logger.debug(f"DICCIONARIO DE CATALOGOS: {catalogs}") logger.debug(f"DICCIONARIO DE CATALOGOS: {catalogs}")
periods = search_periods_from_key_s3(conn, bucket, tacom_key, period_pattern) base_date = get_base_date(conn, bucket, tacom_key)
omited_cols = filters["fields_omited"] omited_cols = filters["fields_omited"]
logger.debug(f"PERIODOS: {periods}")
for period in periods:
current_delimiter = delimiter current_delimiter = delimiter
logger.debug(period) tacom = get_df_from_s3(conn, bucket, tacom_key, current_delimiter, base_date, interval)
tacom = get_df_from_s3(conn, bucket, tacom_key, period, current_delimiter)
tacom_df = remove_invalid_rows(tacom['df'], filters["tacom_drop_nulls_subset"]) tacom_df = remove_invalid_rows(tacom['df'], filters["tacom_drop_nulls_subset"])
tacom_df = remove_fields(tacom_df, omited_cols) tacom_df = remove_fields(tacom_df, omited_cols)
if isinstance(tacom_df, type(None)): if isinstance(tacom_df, type(None)):
logger.error(f"INSUMO TACOM NO ENCONTRADO PARA EL PERIODO {period}") raise AssertionError(f"INSUMO TACOM NO ENCONTRADO")
continue promo = get_df_from_s3(conn, bucket, promo_key, current_delimiter, base_date, interval)
promo = get_df_from_s3(conn, bucket, promo_key, period, current_delimiter)
promo_df = remove_invalid_rows(promo['df'], filters["promo_drop_nulls_subset"]) promo_df = remove_invalid_rows(promo['df'], filters["promo_drop_nulls_subset"])
promo_df = remove_fields(promo_df, omited_cols) promo_df = remove_fields(promo_df, omited_cols)
if isinstance(promo_df, type(None)): if isinstance(promo_df, type(None)):
logger.error(f"INSUMO PROMOCIONES NO ENCONTRADO PARA EL PERIODO {period}") raise AssertionError(f"INSUMO PROMOCIONES NO ENCONTRADO")
continue
if "catalogo_promociones_delimiter" in catalogs.keys(): if "catalogo_promociones_delimiter" in catalogs.keys():
current_delimiter = catalogs["catalogo_promociones_delimiter"] current_delimiter = catalogs["catalogo_promociones_delimiter"]
promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_catalogo_promociones"], period, current_delimiter) promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_catalogo_promociones"], current_delimiter, base_date, interval)
promo_catalog_df = remove_invalid_rows(promo_catalog['df'], filters["catalog_drop_nulls_subset"]) promo_catalog_df = remove_invalid_rows(promo_catalog['df'], filters["catalog_drop_nulls_subset"])
promo_catalog_df = remove_fields(promo_catalog_df, omited_cols) promo_catalog_df = remove_fields(promo_catalog_df, omited_cols)
if isinstance(promo_catalog_df, type(None)): if isinstance(promo_catalog_df, type(None)):
logger.error(f"INSUMO PROMOCIONES CATALOGO NO ENCONTRADO PARA EL PERIODO {period}") raise AssertionError(f"INSUMO PROMOCIONES CATALOGO NO ENCONTRADO")
continue
if "relacionpoidpaquete_delimiter" in catalogs.keys(): if "relacionpoidpaquete_delimiter" in catalogs.keys():
current_delimiter = catalogs["relacionpoidpaquete_delimiter"] current_delimiter = catalogs["relacionpoidpaquete_delimiter"]
relationpoid = get_df_from_s3(conn, bucket, catalogs["s3_relacionpoidpaquete"], period, current_delimiter) relationpoid = get_df_from_s3(conn, bucket, catalogs["s3_relacionpoidpaquete"], current_delimiter, base_date, interval)
relationpoid_df = remove_invalid_rows(relationpoid['df'], filters["relapoid_drop_nulls_subset"]) relationpoid_df = remove_invalid_rows(relationpoid['df'], filters["relapoid_drop_nulls_subset"])
relationpoid_df = remove_fields(relationpoid_df, omited_cols) relationpoid_df = remove_fields(relationpoid_df, omited_cols)
if isinstance(relationpoid_df, type(None)): if isinstance(relationpoid_df, type(None)):
logger.error(f"INSUMO RELACION POID NO ENCONTRADO PARA EL PERIODO {period}") raise AssertionError(f"INSUMO RELACION POID NO ENCONTRADO")
continue
if "relacion3pa2p_delimiter" in catalogs.keys(): if "relacion3pa2p_delimiter" in catalogs.keys():
current_delimiter = catalogs["relacion3pa2p_delimiter"] current_delimiter = catalogs["relacion3pa2p_delimiter"]
relation3a2p = get_df_from_s3(conn, bucket, catalogs["s3_relacion3pa2p"], period, current_delimiter) relation3a2p = get_df_from_s3(conn, bucket, catalogs["s3_relacion3pa2p"], current_delimiter, base_date, interval)
relation3a2p_df = remove_invalid_rows(relation3a2p['df'], filters["rela3pa2p_drop_nulls_subset"]) relation3a2p_df = remove_invalid_rows(relation3a2p['df'], filters["rela3pa2p_drop_nulls_subset"])
relation3a2p_df = remove_fields(relation3a2p_df, omited_cols) relation3a2p_df = remove_fields(relation3a2p_df, omited_cols)
if isinstance(relation3a2p_df, type(None)): if isinstance(relation3a2p_df, type(None)):
logger.error(f"INSUMO RELACION 3A2P NO ENCONTRADO PARA EL PERIODO {period}") raise AssertionError(f"INSUMO RELACION 3A2P NO ENCONTRADO")
continue
if "relacion_paquetes_delimiter" in catalogs.keys(): if "relacion_paquetes_delimiter" in catalogs.keys():
current_delimiter = catalogs["relacion_paquetes_delimiter"] current_delimiter = catalogs["relacion_paquetes_delimiter"]
relapaq_catalog = get_df_from_s3(conn, bucket, catalogs["s3_relacion_paquetes"], period, current_delimiter) relapaq_catalog = get_df_from_s3(conn, bucket, catalogs["s3_relacion_paquetes"], current_delimiter, base_date, interval)
relapaq_df = remove_invalid_rows(relapaq_catalog['df'], filters["relapaqs_drop_nulls_subset"]) relapaq_df = remove_invalid_rows(relapaq_catalog['df'], filters["relapaqs_drop_nulls_subset"])
relapaq_df = remove_fields(relapaq_df, omited_cols) relapaq_df = remove_fields(relapaq_df, omited_cols)
if isinstance(relapaq_df, type(None)): if isinstance(relapaq_df, type(None)):
logger.error(f"INSUMO RELACION PAQUETE INICIAL Y FINAL NO ENCONTRADO PARA EL PERIODO {period}") raise AssertionError(f"INSUMO RELACION PAQUETE INICIAL Y FINAL NO ENCONTRADO")
continue
if "no_promocion_delimiter" in catalogs.keys(): if "no_promocion_delimiter" in catalogs.keys():
current_delimiter = catalogs["no_promocion_delimiter"] current_delimiter = catalogs["no_promocion_delimiter"]
not_promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_no_promocion"], period, current_delimiter) not_promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_no_promocion"], current_delimiter, base_date, interval)
not_promo_df = remove_invalid_rows(not_promo_catalog['df'], filters["not_promo_drop_nulls_subset"]) not_promo_df = remove_invalid_rows(not_promo_catalog['df'], filters["not_promo_drop_nulls_subset"])
not_promo_df = remove_fields(not_promo_df, omited_cols) not_promo_df = remove_fields(not_promo_df, omited_cols)
if isinstance(not_promo_df, type(None)): if isinstance(not_promo_df, type(None)):
logger.error(f"INSUMO PAQUETES SIN PROMOCION NO ENCONTRADO PARA EL PERIODO {period}") raise AssertionError(f"INSUMO PAQUETES SIN PROMOCION NO ENCONTRADO")
continue
logger.info(f"Ejecutando proceso para el periodo: {period}") logger.info(f"EJECUTANDO PROCESO")
promo_df = dag1_id1(promo_df, promo_catalog_df, catalogs["catalogo_promociones_key"], promo_df = dag1_id1(promo_df, promo_catalog_df, catalogs["catalogo_promociones_key"],
catalogs["catalogo_promociones_value"]) catalogs["catalogo_promociones_value"])
promo_df = dag1_id2(promo_df, relationpoid_df, catalogs["relacionpoidpaquete_key"], promo_df = dag1_id2(promo_df, relationpoid_df, catalogs["relacionpoidpaquete_key"],
...@@ -220,11 +211,9 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D ...@@ -220,11 +211,9 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D
result = result.sort_values(by="CD_FOLIO") result = result.sort_values(by="CD_FOLIO")
# Subir los resultados al S3 # Subir los resultados al S3
tacom_output = add_period_to_sufix(outputs["tacom_output_path"], period) save_df_to_s3(result, conn, bucket, outputs["tacom_output_path"], outputs["tacom_delimiter"])
save_df_to_s3(result, conn, bucket, tacom_output, outputs["tacom_delimiter"])
promo_output = add_period_to_sufix(outputs["promo_output_path"], period)
promo_df.rename(columns={'CD_PAQUETE_PROMO': 'CD_PAQUETE'}, inplace=True) promo_df.rename(columns={'CD_PAQUETE_PROMO': 'CD_PAQUETE'}, inplace=True)
save_df_to_s3(promo_df, conn, bucket, promo_output, outputs["promo_delimiter"]) save_df_to_s3(promo_df, conn, bucket, outputs["promo_output_path"], outputs["promo_delimiter"])
# Mover TODOS LOS INSUMOS USADOS # Mover TODOS LOS INSUMOS USADOS
move_object_s3(conn, bucket, tacom['filename'], procesed_prefix) move_object_s3(conn, bucket, tacom['filename'], procesed_prefix)
...@@ -312,7 +301,7 @@ def set_dag_1(): ...@@ -312,7 +301,7 @@ def set_dag_1():
python_callable=etl_dag1, python_callable=etl_dag1,
op_kwargs={'conn': s3_conf["s3_conn_id"], 'bucket': s3_conf["bucket"], op_kwargs={'conn': s3_conf["s3_conn_id"], 'bucket': s3_conf["bucket"],
'tacom_key': s3_tacom, 'promo_key': s3_promo, 'catalogs': catalogs_dict, 'tacom_key': s3_tacom, 'promo_key': s3_promo, 'catalogs': catalogs_dict,
'period_pattern': conf["period_pattern"], 'delimiter': conf["csv_delimiter"], 'delimiter': conf["csv_delimiter"], 'interval': conf["group_input_interval_days"],
'outputs': outputs, 'procesed_prefix': procesed_prefix, 'filters': filters}, 'outputs': outputs, 'procesed_prefix': procesed_prefix, 'filters': filters},
trigger_rule="all_success" trigger_rule="all_success"
) )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment