Commit 3ebc44a5 authored by Cristian Aguirre's avatar Cristian Aguirre

Update DAG-TACOMVENTAS-PROMOCIONESRESIDENCIAL-04-06-23

parent d5c367db
...@@ -8,7 +8,7 @@ dags: ...@@ -8,7 +8,7 @@ dags:
dag1: dag1:
schedule: "@once" schedule: "@once"
period_pattern: '[a-zA-Z0-9]+([0-9]{4})(\-[0-9]{2})?\.[a-zA-Z]*' period_pattern: '[a-zA-Z0-9]+([0-9]{4})(\-[0-9]{2})?\.[a-zA-Z]*'
csv_delimiter: "," csv_delimiter: ";"
filters: filters:
fields_omited: ["BCOM_ROW_IDENTIFIER", "BCOM_PROCESS_ID", "BCOM_FIELD_KEY", "BCOM_LNUMBER", "BCOM_ERROR_CODE", fields_omited: ["BCOM_ROW_IDENTIFIER", "BCOM_PROCESS_ID", "BCOM_FIELD_KEY", "BCOM_LNUMBER", "BCOM_ERROR_CODE",
"BCOM_ERROR_MESSAGE"] "BCOM_ERROR_MESSAGE"]
...@@ -22,8 +22,8 @@ dags: ...@@ -22,8 +22,8 @@ dags:
s3_parameters: s3_parameters:
inputs: inputs:
prefix: "pruebas_qa" prefix: "pruebas_qa"
tacom_pattern: "tacomventas_original*.txt" tacom_pattern: "TACOMVENTAS_original*.xlsx"
promociones_pattern: "promociones_original*.txt" promociones_pattern: "PROMOCIONES_RESIDENCIAL_original*.xlsx"
outputs: outputs:
prefix: "prueba3/tacom_outputs" prefix: "prueba3/tacom_outputs"
tacom_output: "tacom_modified.csv" tacom_output: "tacom_modified.csv"
......
...@@ -79,9 +79,12 @@ def search_periods_from_key_s3(conn: str, bucket: str, key: str, pattern: str) - ...@@ -79,9 +79,12 @@ def search_periods_from_key_s3(conn: str, bucket: str, key: str, pattern: str) -
for file in files: for file in files:
if not re.search(pattern, file): if not re.search(pattern, file):
continue continue
period = file[file.rfind(".")-7:file.rfind(".")] if file[file.rfind(".")-6:file.rfind(".")].isdigit():
if period.find("-") == -1: period = file[file.rfind(".")-6:file.rfind(".")]
period = period[1:5] + "-" + period[5:] else:
period = file[file.rfind(".")-7:file.rfind(".")]
if period.find("-") == -1:
period = period[1:5] + "-" + period[5:]
periods.add(period) periods.add(period)
except Exception as e: except Exception as e:
logger.error(f"Error buscando periodos disponibles en los archivos. key: {key}. {e}") logger.error(f"Error buscando periodos disponibles en los archivos. key: {key}. {e}")
......
...@@ -9,7 +9,6 @@ from components.S3Route import get_df_from_s3, search_periods_from_key_s3, save_ ...@@ -9,7 +9,6 @@ from components.S3Route import get_df_from_s3, search_periods_from_key_s3, save_
from components.Sensor import create_s3_sensor from components.Sensor import create_s3_sensor
from components.Utils import get_modified_prefix, add_period_to_sufix, remove_invalid_rows, remove_fields, \ from components.Utils import get_modified_prefix, add_period_to_sufix, remove_invalid_rows, remove_fields, \
update_dict_with_catalogs update_dict_with_catalogs
from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from airflow import DAG from airflow import DAG
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
...@@ -62,12 +61,11 @@ def dag1_id3(tacom: pd.DataFrame, promo: pd.DataFrame) -> pd.DataFrame: ...@@ -62,12 +61,11 @@ def dag1_id3(tacom: pd.DataFrame, promo: pd.DataFrame) -> pd.DataFrame:
promo.drop("CD_PAQUETE", axis=1, inplace=True) promo.drop("CD_PAQUETE", axis=1, inplace=True)
promo = promo.drop_duplicates(["CUENTA", "CD_PAQUETE_PROMO"]) promo = promo.drop_duplicates(["CUENTA", "CD_PAQUETE_PROMO"])
result = tacom.merge(promo, how='left', left_on=["CD_CUENTA", "CD_PAQUETE"], right_on=["CUENTA", "CD_PAQUETE_PROMO"]) result = tacom.merge(promo, how='left', left_on=["CD_CUENTA", "CD_PAQUETE"], right_on=["CUENTA", "CD_PAQUETE_PROMO"])
result["CD_PAQUETE"] = result["CD_PAQUETE"].astype(str) result["CD_PAQUETE"] = result["CD_PAQUETE"].astype(int).astype(str)
no_consider = "Adicional|Soundbox|SOUNDBOX" no_consider = "Adicional|Soundbox|SOUNDBOX"
aa = result[result["CD_PAQUETE"].str.len() <= 5] result["PROMOCION"] = np.where((result["CD_PAQUETE_PROMO"].isna()) | (result["CD_PAQUETE_PROMO"] == "None") |
result["PROMOCION"] = np.where((result["CD_PAQUETE"].isna()) | (result["CD_PAQUETE"] == "None") | (result["CD_PAQUETE_PROMO"] == "nan"), None,
(result["CD_PAQUETE"] == "nan"), None,
np.where((result["CD_PAQUETE"].notna()) & (result["CD_PAQUETE"].str.len() <= 5), "3P a 2P", np.where((result["CD_PAQUETE"].notna()) & (result["CD_PAQUETE"].str.len() <= 5), "3P a 2P",
np.where((result["NOMBRE_PRODUCTO"].str.contains("TV", na=False)) & np.where((result["NOMBRE_PRODUCTO"].str.contains("TV", na=False)) &
(~result["NOMBRE_PRODUCTO"].str.contains(no_consider, na=False)), "Parrilla", (~result["NOMBRE_PRODUCTO"].str.contains(no_consider, na=False)), "Parrilla",
...@@ -112,24 +110,21 @@ def dag1_id5(df: pd.DataFrame, relation: pd.DataFrame, key_field: str, value_fie ...@@ -112,24 +110,21 @@ def dag1_id5(df: pd.DataFrame, relation: pd.DataFrame, key_field: str, value_fie
return df return df
def dag1_id6(df: pd.DataFrame, notpromo: pd.DataFrame, key_field: str) -> pd.DataFrame: def dag1_id6(df: pd.DataFrame) -> pd.DataFrame:
try: try:
df["NRO_PAQUETE"] = 0 df["NRO_PAQUETE"] = df.groupby(["CD_FOLIO", "CD_PAQUETE"]).cumcount() + 1
notpromo = notpromo.astype(str)
not_promo_values = notpromo[key_field].tolist()
df_without_paq = df[~df["CD_PAQUETE"].isin(not_promo_values)]
df_with_paq = df[df["CD_PAQUETE"].isin(not_promo_values)]
df_with_paq["NRO_PAQUETE"] = df_with_paq.groupby(["CD_FOLIO", "CD_PAQUETE"]).cumcount() + 1
df = pd.concat([df_with_paq, df_without_paq], ignore_index=True)
except Exception as e: except Exception as e:
logger.error(f"Error DAG1_ID6. {e}") logger.error(f"Error DAG1_ID6. {e}")
finally: finally:
return df return df
def dag1_id7(df: pd.DataFrame) -> pd.DataFrame: def dag1_id7(df: pd.DataFrame, notpromo: pd.DataFrame, key_field: str) -> pd.DataFrame:
try: try:
df["PROMOCION"] = np.where((df["TP_SERVICIO"] == 2) & (df["NRO_PAQUETE"] > 1), None, df["PROMOCION"]) notpromo = notpromo.astype(str)
not_promo_values = notpromo[key_field].tolist()
df["PROMOCION"] = np.where((df["TP_SERVICIO"] == 2) & (df["NRO_PAQUETE"] > 1) &
(df["CD_PAQUETE"].isin(not_promo_values)), None, df["PROMOCION"])
except Exception as e: except Exception as e:
logger.error(f"Error DAG1_ID7. {e}") logger.error(f"Error DAG1_ID7. {e}")
finally: finally:
...@@ -142,15 +137,17 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D ...@@ -142,15 +137,17 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D
logger.debug(f"DICCIONARIO DE CATALOGOS: {catalogs}") logger.debug(f"DICCIONARIO DE CATALOGOS: {catalogs}")
periods = search_periods_from_key_s3(conn, bucket, tacom_key, period_pattern) periods = search_periods_from_key_s3(conn, bucket, tacom_key, period_pattern)
omited_cols = filters["fields_omited"] omited_cols = filters["fields_omited"]
logger.debug(f"PERIODOS: {periods}")
for period in periods: for period in periods:
current_delimiter = delimiter
logger.debug(period) logger.debug(period)
tacom = get_df_from_s3(conn, bucket, tacom_key, period, delimiter) tacom = get_df_from_s3(conn, bucket, tacom_key, period, current_delimiter)
tacom_df = remove_invalid_rows(tacom['df'], filters["tacom_drop_nulls_subset"]) tacom_df = remove_invalid_rows(tacom['df'], filters["tacom_drop_nulls_subset"])
tacom_df = remove_fields(tacom_df, omited_cols) tacom_df = remove_fields(tacom_df, omited_cols)
if isinstance(tacom_df, type(None)): if isinstance(tacom_df, type(None)):
logger.error(f"INSUMO TACOM NO ENCONTRADO PARA EL PERIODO {period}") logger.error(f"INSUMO TACOM NO ENCONTRADO PARA EL PERIODO {period}")
continue continue
promo = get_df_from_s3(conn, bucket, promo_key, period, delimiter) promo = get_df_from_s3(conn, bucket, promo_key, period, current_delimiter)
promo_df = remove_invalid_rows(promo['df'], filters["promo_drop_nulls_subset"]) promo_df = remove_invalid_rows(promo['df'], filters["promo_drop_nulls_subset"])
promo_df = remove_fields(promo_df, omited_cols) promo_df = remove_fields(promo_df, omited_cols)
if isinstance(promo_df, type(None)): if isinstance(promo_df, type(None)):
...@@ -158,8 +155,8 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D ...@@ -158,8 +155,8 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D
continue continue
if "catalogo_promociones_delimiter" in catalogs.keys(): if "catalogo_promociones_delimiter" in catalogs.keys():
delimiter = catalogs["catalogo_promociones_delimiter"] current_delimiter = catalogs["catalogo_promociones_delimiter"]
promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_catalogo_promociones"], period, delimiter) promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_catalogo_promociones"], period, current_delimiter)
promo_catalog_df = remove_invalid_rows(promo_catalog['df'], filters["catalog_drop_nulls_subset"]) promo_catalog_df = remove_invalid_rows(promo_catalog['df'], filters["catalog_drop_nulls_subset"])
promo_catalog_df = remove_fields(promo_catalog_df, omited_cols) promo_catalog_df = remove_fields(promo_catalog_df, omited_cols)
if isinstance(promo_catalog_df, type(None)): if isinstance(promo_catalog_df, type(None)):
...@@ -167,8 +164,8 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D ...@@ -167,8 +164,8 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D
continue continue
if "relacionpoidpaquete_delimiter" in catalogs.keys(): if "relacionpoidpaquete_delimiter" in catalogs.keys():
delimiter = catalogs["relacionpoidpaquete_delimiter"] current_delimiter = catalogs["relacionpoidpaquete_delimiter"]
relationpoid = get_df_from_s3(conn, bucket, catalogs["s3_relacionpoidpaquete"], period, delimiter) relationpoid = get_df_from_s3(conn, bucket, catalogs["s3_relacionpoidpaquete"], period, current_delimiter)
relationpoid_df = remove_invalid_rows(relationpoid['df'], filters["relapoid_drop_nulls_subset"]) relationpoid_df = remove_invalid_rows(relationpoid['df'], filters["relapoid_drop_nulls_subset"])
relationpoid_df = remove_fields(relationpoid_df, omited_cols) relationpoid_df = remove_fields(relationpoid_df, omited_cols)
if isinstance(relationpoid_df, type(None)): if isinstance(relationpoid_df, type(None)):
...@@ -176,8 +173,8 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D ...@@ -176,8 +173,8 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D
continue continue
if "relacion3pa2p_delimiter" in catalogs.keys(): if "relacion3pa2p_delimiter" in catalogs.keys():
delimiter = catalogs["relacion3pa2p_delimiter"] current_delimiter = catalogs["relacion3pa2p_delimiter"]
relation3a2p = get_df_from_s3(conn, bucket, catalogs["s3_relacion3pa2p"], period, delimiter) relation3a2p = get_df_from_s3(conn, bucket, catalogs["s3_relacion3pa2p"], period, current_delimiter)
relation3a2p_df = remove_invalid_rows(relation3a2p['df'], filters["rela3pa2p_drop_nulls_subset"]) relation3a2p_df = remove_invalid_rows(relation3a2p['df'], filters["rela3pa2p_drop_nulls_subset"])
relation3a2p_df = remove_fields(relation3a2p_df, omited_cols) relation3a2p_df = remove_fields(relation3a2p_df, omited_cols)
if isinstance(relation3a2p_df, type(None)): if isinstance(relation3a2p_df, type(None)):
...@@ -185,8 +182,8 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D ...@@ -185,8 +182,8 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D
continue continue
if "relacion_paquetes_delimiter" in catalogs.keys(): if "relacion_paquetes_delimiter" in catalogs.keys():
delimiter = catalogs["relacion_paquetes_delimiter"] current_delimiter = catalogs["relacion_paquetes_delimiter"]
relapaq_catalog = get_df_from_s3(conn, bucket, catalogs["s3_relacion_paquetes"], period, delimiter) relapaq_catalog = get_df_from_s3(conn, bucket, catalogs["s3_relacion_paquetes"], period, current_delimiter)
relapaq_df = remove_invalid_rows(relapaq_catalog['df'], filters["relapaqs_drop_nulls_subset"]) relapaq_df = remove_invalid_rows(relapaq_catalog['df'], filters["relapaqs_drop_nulls_subset"])
relapaq_df = remove_fields(relapaq_df, omited_cols) relapaq_df = remove_fields(relapaq_df, omited_cols)
if isinstance(relapaq_df, type(None)): if isinstance(relapaq_df, type(None)):
...@@ -194,8 +191,8 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D ...@@ -194,8 +191,8 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D
continue continue
if "no_promocion_delimiter" in catalogs.keys(): if "no_promocion_delimiter" in catalogs.keys():
delimiter = catalogs["no_promocion_delimiter"] current_delimiter = catalogs["no_promocion_delimiter"]
not_promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_no_promocion"], period, delimiter) not_promo_catalog = get_df_from_s3(conn, bucket, catalogs["s3_no_promocion"], period, current_delimiter)
not_promo_df = remove_invalid_rows(not_promo_catalog['df'], filters["not_promo_drop_nulls_subset"]) not_promo_df = remove_invalid_rows(not_promo_catalog['df'], filters["not_promo_drop_nulls_subset"])
not_promo_df = remove_fields(not_promo_df, omited_cols) not_promo_df = remove_fields(not_promo_df, omited_cols)
if isinstance(not_promo_df, type(None)): if isinstance(not_promo_df, type(None)):
...@@ -210,10 +207,11 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D ...@@ -210,10 +207,11 @@ def etl_dag1(conn: str, bucket: str, tacom_key: str, promo_key: str, catalogs: D
result = dag1_id3(tacom_df, promo_df) result = dag1_id3(tacom_df, promo_df)
result = dag1_id4(result, relation3a2p_df, catalogs["relacion3pa2p_key"], catalogs["relacion3pa2p_value"]) result = dag1_id4(result, relation3a2p_df, catalogs["relacion3pa2p_key"], catalogs["relacion3pa2p_value"])
result = dag1_id5(result, relapaq_df, catalogs["relacion_paquetes_key"], catalogs["relacion_paquetes_value"]) result = dag1_id5(result, relapaq_df, catalogs["relacion_paquetes_key"], catalogs["relacion_paquetes_value"])
result = dag1_id6(result, not_promo_df, catalogs["no_promocion_key"]) result = dag1_id6(result)
result = dag1_id7(result) result = dag1_id7(result, not_promo_df, catalogs["no_promocion_key"])
final_columns = list(tacom_df.columns) final_columns = list(tacom_df.columns)
result = result[final_columns] result = result[final_columns]
result = result.sort_values(by="CD_FOLIO")
# Subir los resultados al S3 # Subir los resultados al S3
tacom_output = add_period_to_sufix(outputs["tacom_output_path"], period) tacom_output = add_period_to_sufix(outputs["tacom_output_path"], period)
...@@ -242,7 +240,7 @@ def set_dag_1(): ...@@ -242,7 +240,7 @@ def set_dag_1():
from yaml.loader import SafeLoader from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml # Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags # En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/opt/airflow/dags/app_conf.yml" conf_path = "/root/airflow/dags/app_conf.yml"
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader) data = yaml.load(f, Loader=SafeLoader)
general_cnf = data["general"] general_cnf = data["general"]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment