Commit d5c367db authored by Cristian Aguirre's avatar Cristian Aguirre

Update DAG-TACOMVENTAS-PROMOCIONESRESIDENCIAL-02-06-23

parent a227e2ae
...@@ -62,12 +62,13 @@ def dag1_id3(tacom: pd.DataFrame, promo: pd.DataFrame) -> pd.DataFrame: ...@@ -62,12 +62,13 @@ def dag1_id3(tacom: pd.DataFrame, promo: pd.DataFrame) -> pd.DataFrame:
promo.drop("CD_PAQUETE", axis=1, inplace=True) promo.drop("CD_PAQUETE", axis=1, inplace=True)
promo = promo.drop_duplicates(["CUENTA", "CD_PAQUETE_PROMO"]) promo = promo.drop_duplicates(["CUENTA", "CD_PAQUETE_PROMO"])
result = tacom.merge(promo, how='left', left_on=["CD_CUENTA", "CD_PAQUETE"], right_on=["CUENTA", "CD_PAQUETE_PROMO"]) result = tacom.merge(promo, how='left', left_on=["CD_CUENTA", "CD_PAQUETE"], right_on=["CUENTA", "CD_PAQUETE_PROMO"])
result["CD_PAQUETE_PROMO"] = result["CD_PAQUETE_PROMO"].astype(str) result["CD_PAQUETE"] = result["CD_PAQUETE"].astype(str)
no_consider = "Adicional|Soundbox|SOUNDBOX" no_consider = "Adicional|Soundbox|SOUNDBOX"
result["PROMOCION"] = np.where((result["CD_PAQUETE_PROMO"].isna()) | (result["CD_PAQUETE_PROMO"] == "None") | aa = result[result["CD_PAQUETE"].str.len() <= 5]
(result["CD_PAQUETE_PROMO"] == "nan"), None, result["PROMOCION"] = np.where((result["CD_PAQUETE"].isna()) | (result["CD_PAQUETE"] == "None") |
np.where((result["CD_PAQUETE_PROMO"].notna()) & (result["CD_PAQUETE_PROMO"].str.len() <= 5), "3P a 2P", (result["CD_PAQUETE"] == "nan"), None,
np.where((result["CD_PAQUETE"].notna()) & (result["CD_PAQUETE"].str.len() <= 5), "3P a 2P",
np.where((result["NOMBRE_PRODUCTO"].str.contains("TV", na=False)) & np.where((result["NOMBRE_PRODUCTO"].str.contains("TV", na=False)) &
(~result["NOMBRE_PRODUCTO"].str.contains(no_consider, na=False)), "Parrilla", (~result["NOMBRE_PRODUCTO"].str.contains(no_consider, na=False)), "Parrilla",
np.where((result["NOMBRE_PRODUCTO"].str.contains("CANALES", na=False)) & np.where((result["NOMBRE_PRODUCTO"].str.contains("CANALES", na=False)) &
...@@ -119,7 +120,7 @@ def dag1_id6(df: pd.DataFrame, notpromo: pd.DataFrame, key_field: str) -> pd.Dat ...@@ -119,7 +120,7 @@ def dag1_id6(df: pd.DataFrame, notpromo: pd.DataFrame, key_field: str) -> pd.Dat
df_without_paq = df[~df["CD_PAQUETE"].isin(not_promo_values)] df_without_paq = df[~df["CD_PAQUETE"].isin(not_promo_values)]
df_with_paq = df[df["CD_PAQUETE"].isin(not_promo_values)] df_with_paq = df[df["CD_PAQUETE"].isin(not_promo_values)]
df_with_paq["NRO_PAQUETE"] = df_with_paq.groupby(["CD_FOLIO", "CD_PAQUETE"]).cumcount() + 1 df_with_paq["NRO_PAQUETE"] = df_with_paq.groupby(["CD_FOLIO", "CD_PAQUETE"]).cumcount() + 1
df = pd.concat([df_with_paq, df_without_paq]).reset_index(drop=True) df = pd.concat([df_with_paq, df_without_paq], ignore_index=True)
except Exception as e: except Exception as e:
logger.error(f"Error DAG1_ID6. {e}") logger.error(f"Error DAG1_ID6. {e}")
finally: finally:
...@@ -239,7 +240,9 @@ def set_dag_1(): ...@@ -239,7 +240,9 @@ def set_dag_1():
import yaml import yaml
from yaml.loader import SafeLoader from yaml.loader import SafeLoader
conf_path = "/root/airflow/dags/app_conf.yml" # Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/opt/airflow/dags/app_conf.yml"
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader) data = yaml.load(f, Loader=SafeLoader)
general_cnf = data["general"] general_cnf = data["general"]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment