Commit b3a54460 authored by Cristian Aguirre's avatar Cristian Aguirre

Update DAG-TACOMVENTAS-PROMOCIONESRESIDENCIAL-05-06-23

parent 3ebc44a5
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
general: general:
s3_parameters: s3_parameters:
s3_conn_id: "bcom_tp_connection" s3_conn_id: "bcom_tp_connection"
bucket: "prueba1234568" bucket: "prueba-ca"
dags: dags:
dag1: dag1:
...@@ -41,14 +41,14 @@ dags: ...@@ -41,14 +41,14 @@ dags:
relacion3pa2p: relacion3pa2p:
type: "INSUMO" type: "INSUMO"
pattern: "temporal_relacion3pa2p*.txt" pattern: "temporal_relacion3pa2p*.txt"
prefix: "pruebas_qa" prefix: ""
key_field: "TRESP" key_field: "TRESP"
value_field: "DOSP" value_field: "DOSP"
delimiter: "," delimiter: ","
relacionpoidpaquete: relacionpoidpaquete:
type: "INSUMO" type: "INSUMO"
pattern: "temporal_relacion_Paquete*.txt" pattern: "temporal_relacion_Paquete*.txt"
prefix: "pruebas_qa" prefix: ""
key_field: "POID_PRODUCT" key_field: "POID_PRODUCT"
value_field: "CD_PAQUETE" value_field: "CD_PAQUETE"
delimiter: "," delimiter: ","
......
...@@ -27,13 +27,13 @@ def get_df_from_s3(conn: str, bucket: str, key: str, period: str, delimiter: str ...@@ -27,13 +27,13 @@ def get_df_from_s3(conn: str, bucket: str, key: str, period: str, delimiter: str
response.update({'filename': s3_data["filename"]}) response.update({'filename': s3_data["filename"]})
file_type = get_type_file(s3_data["filename"]) file_type = get_type_file(s3_data["filename"])
if file_type == FileTypeEnum.EXCEL: if file_type == FileTypeEnum.EXCEL:
dataframe = pd.read_excel(s3_data["data"], engine="openpyxl") dataframe = pd.read_excel(s3_data["data"], engine="openpyxl", dtype='object')
elif file_type == FileTypeEnum.OLD_EXCEL: elif file_type == FileTypeEnum.OLD_EXCEL:
dataframe = pd.read_excel(s3_data["data"], engine="xlrd") dataframe = pd.read_excel(s3_data["data"], engine="xlrd", dtype='object')
elif file_type == FileTypeEnum.TEXT or file_type == FileTypeEnum.CSV: elif file_type == FileTypeEnum.TEXT or file_type == FileTypeEnum.CSV:
str_data = str(s3_data["data"].getvalue(), encoding='UTF-8', errors='ignore') str_data = str(s3_data["data"].getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data) data = StringIO(str_data)
dataframe = pd.read_csv(data, sep=delimiter) dataframe = pd.read_csv(data, sep=delimiter, dtype='object')
response.update({'df': dataframe}) response.update({'df': dataframe})
except Exception as e: except Exception as e:
logger.error(f"Error trayendo y transformando a DataFrame desde S3 con periodo {period}. {e}") logger.error(f"Error trayendo y transformando a DataFrame desde S3 con periodo {period}. {e}")
......
...@@ -18,6 +18,11 @@ import logging ...@@ -18,6 +18,11 @@ import logging
logger = logging.getLogger() logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACION_TACOMVENTAS_PROMOCIONESRESIDENCIAL" DAG_NAME = "BCOM_DAG_TRANSFORMACION_TACOMVENTAS_PROMOCIONESRESIDENCIAL"
PROMOCION_3PA2P = "3P a 2P"
PROMOCION_PARRILLA = "Parrilla"
PROMOCION_HBO = "Promocion HBO"
PROMOCION_DEFAULT = "Promocion"
PROMOCIONES_NO_CONSIDERADAS_TV_CANALES = "Adicional|Soundbox|SOUNDBOX"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'airflow', 'owner': 'airflow',
...@@ -56,25 +61,26 @@ def dag1_id2(promo: pd.DataFrame, relation_poid: pd.DataFrame, key_field: str, v ...@@ -56,25 +61,26 @@ def dag1_id2(promo: pd.DataFrame, relation_poid: pd.DataFrame, key_field: str, v
def dag1_id3(tacom: pd.DataFrame, promo: pd.DataFrame) -> pd.DataFrame: def dag1_id3(tacom: pd.DataFrame, promo: pd.DataFrame) -> pd.DataFrame:
result = pd.DataFrame() result = pd.DataFrame()
try: try:
promo["CUENTA"] = promo["CUENTA"].astype(int, errors='ignore')
promo["CD_PAQUETE_PROMO"] = promo["CD_PAQUETE"].astype(int, errors='ignore') promo["CD_PAQUETE_PROMO"] = promo["CD_PAQUETE"].astype(int, errors='ignore')
promo.drop("CD_PAQUETE", axis=1, inplace=True) promo.drop("CD_PAQUETE", axis=1, inplace=True)
promo = promo.drop_duplicates(["CUENTA", "CD_PAQUETE_PROMO"]) promo = promo.drop_duplicates(["CUENTA", "CD_PAQUETE_PROMO"])
result = tacom.merge(promo, how='left', left_on=["CD_CUENTA", "CD_PAQUETE"], right_on=["CUENTA", "CD_PAQUETE_PROMO"]) result = tacom.merge(promo, how='left', left_on=["CD_CUENTA", "CD_PAQUETE"], right_on=["CUENTA", "CD_PAQUETE_PROMO"])
result["CD_PAQUETE"] = result["CD_PAQUETE"].astype(int).astype(str) result["CD_PAQUETE"] = result["CD_PAQUETE"].astype(int).astype(str)
no_consider = "Adicional|Soundbox|SOUNDBOX"
result["PROMOCION"] = np.where((result["CD_PAQUETE_PROMO"].isna()) | (result["CD_PAQUETE_PROMO"] == "None") | result["PROMOCION"] = np.where((result["CD_PAQUETE_PROMO"].isna()) | (result["CD_PAQUETE_PROMO"] == "None") |
(result["CD_PAQUETE_PROMO"] == "nan"), None, (result["CD_PAQUETE_PROMO"] == "nan"), None,
np.where((result["CD_PAQUETE"].notna()) & (result["CD_PAQUETE"].str.len() <= 5), "3P a 2P", np.where((result["CD_PAQUETE"].notna()) & (result["CD_PAQUETE"].str.len() <= 5),
PROMOCION_3PA2P,
np.where((result["NOMBRE_PRODUCTO"].str.contains("TV", na=False)) & np.where((result["NOMBRE_PRODUCTO"].str.contains("TV", na=False)) &
(~result["NOMBRE_PRODUCTO"].str.contains(no_consider, na=False)), "Parrilla", (~result["NOMBRE_PRODUCTO"].str.contains(PROMOCIONES_NO_CONSIDERADAS_TV_CANALES, na=False)),
np.where((result["NOMBRE_PRODUCTO"].str.contains("CANALES", na=False)) & PROMOCION_PARRILLA,
(~result["NOMBRE_PRODUCTO"].str.contains(no_consider, na=False)), "Parrilla", np.where((result["NOMBRE_PRODUCTO"].str.contains("CANALES", na=False)) &
np.where(result["NOMBRE_PRODUCTO"].str.contains("HBO MAX", na=False), "Promocion HBO", (~result["NOMBRE_PRODUCTO"].str.contains(PROMOCIONES_NO_CONSIDERADAS_TV_CANALES, na=False)),
np.where(result["NOMBRE_PRODUCTO"].str.contains("PAQUETE HBO", na=False), "Promocion HBO", PROMOCION_PARRILLA,
np.where(result["NOMBRE_PRODUCTO"].str.contains("HBO MAX", na=False), PROMOCION_HBO,
np.where(result["NOMBRE_PRODUCTO"].str.contains("PAQUETE HBO", na=False), PROMOCION_HBO,
np.where(result["NOMBRE_PRODUCTO"].str.contains("STAR PREMIUM", na=False), np.where(result["NOMBRE_PRODUCTO"].str.contains("STAR PREMIUM", na=False),
"Promocion STAR PREMIUM", "PROMOCION"))))))) "Promocion STAR PREMIUM", PROMOCION_DEFAULT)))))))
result["CD_PAQUETE_PROMO"] = np.where((result["CD_PAQUETE_PROMO"] == 'nan') | result["CD_PAQUETE_PROMO"] = np.where((result["CD_PAQUETE_PROMO"] == 'nan') |
(result["CD_PAQUETE_PROMO"] == 'None'), None, result["CD_PAQUETE"]) (result["CD_PAQUETE_PROMO"] == 'None'), None, result["CD_PAQUETE"])
...@@ -91,7 +97,7 @@ def dag1_id4(df: pd.DataFrame, df_promo: pd.DataFrame, key_field: str, value_fie ...@@ -91,7 +97,7 @@ def dag1_id4(df: pd.DataFrame, df_promo: pd.DataFrame, key_field: str, value_fie
df_promo[value_field] = df_promo[value_field].astype(str, errors='ignore') df_promo[value_field] = df_promo[value_field].astype(str, errors='ignore')
df = df.merge(df_promo, how='outer', left_on="CD_PAQUETE", right_on=key_field) df = df.merge(df_promo, how='outer', left_on="CD_PAQUETE", right_on=key_field)
df = df.dropna(how='all', subset=["CD_EMPRESA", "CD_FOLIO", "CD_CUENTA"]) df = df.dropna(how='all', subset=["CD_EMPRESA", "CD_FOLIO", "CD_CUENTA"])
df["CD_PAQUETE"] = np.where((df["PROMOCION"] == "3P a 2P") & (df[key_field].notna()), df[value_field], df["CD_PAQUETE"]) df["CD_PAQUETE"] = np.where((df["PROMOCION"] == PROMOCION_3PA2P) & (df[key_field].notna()), df[value_field], df["CD_PAQUETE"])
df = df.drop([key_field, value_field], axis=1) df = df.drop([key_field, value_field], axis=1)
except Exception as e: except Exception as e:
logger.error(f"Error DAG1_ID4. {e}") logger.error(f"Error DAG1_ID4. {e}")
...@@ -240,7 +246,7 @@ def set_dag_1(): ...@@ -240,7 +246,7 @@ def set_dag_1():
from yaml.loader import SafeLoader from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml # Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags # En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/root/airflow/dags/app_conf.yml" conf_path = "/opt/airflow/dags/app_conf.yml"
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader) data = yaml.load(f, Loader=SafeLoader)
general_cnf = data["general"] general_cnf = data["general"]
......
...@@ -24,5 +24,9 @@ data: ...@@ -24,5 +24,9 @@ data:
_AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: admin _AIRFLOW_WWW_USER_USERNAME: admin
_AIRFLOW_WWW_USER_PASSWORD: admin _AIRFLOW_WWW_USER_PASSWORD: admin
S3_DAGS_DIR: 's3://prueba1234568/dags'
SYNCHRONYZE_DAG_DIR: '30'
MINIO_SERVER: 'http://192.168.49.2:9000'
MINIO_DAGS_DIR: '/prueba-ca/dags'
...@@ -20,15 +20,11 @@ spec: ...@@ -20,15 +20,11 @@ spec:
spec: spec:
containers: containers:
- name: airflow-scheduler - name: airflow-scheduler
image: cristianfernando/airflow_custom:0.0.1 image: apache/airflow:2.5.3
args: ["scheduler"] args: ["scheduler"]
envFrom: envFrom:
- configMapRef: - configMapRef:
name: airflow-envvars-configmap name: airflow-envvars-configmap
resources:
limits:
memory: "512Mi"
# cpu: "100"
volumeMounts: volumeMounts:
- name: dags-host-volume - name: dags-host-volume
mountPath: /opt/airflow/dags mountPath: /opt/airflow/dags
......
apiVersion: v1
kind: Secret
metadata:
name: credentials
type:
Opaque
data:
AWS_ACCESS_KEY: bWluaW9hZG1pbg==
AWS_SECRET_KEY: bWluaW9hZG1pbg==
MINIO_USER: bWluaW9hZG1pbg==
MINIO_PASSWORD: bWluaW9hZG1pbg==
...@@ -20,15 +20,11 @@ spec: ...@@ -20,15 +20,11 @@ spec:
spec: spec:
containers: containers:
- name: airflow-webserver - name: airflow-webserver
image: cristianfernando/airflow_custom:0.0.1 image: apache/airflow:2.5.3
args: ["webserver"] args: ["webserver"]
envFrom: envFrom:
- configMapRef: - configMapRef:
name: airflow-envvars-configmap name: airflow-envvars-configmap
resources:
limits:
memory: "512Mi"
# cpu: "100"
ports: ports:
- containerPort: 8080 - containerPort: 8080
volumeMounts: volumeMounts:
......
...@@ -6,21 +6,12 @@ spec: ...@@ -6,21 +6,12 @@ spec:
containers: containers:
- args: [ ] - args: [ ]
command: [ ] command: [ ]
envFrom:
- configMapRef:
name: airflow-envvars-configmap
env: env:
- name: AIRFLOW__CORE__EXECUTOR - name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor value: LocalExecutor
- name: DB_HOST
value: postgres
- name: DB_DATABASE
value: airflow
- name: DB_USER
value: airflow
- name: DB_PASSWORD
value: airflow
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
value: postgresql+psycopg2://airflow:airflow@postgres/airflow
- name: AIRFLOW__LOGGING__LOGGING_LEVEL
value: INFO
image: dumy-image image: dumy-image
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
name: base name: base
...@@ -30,7 +21,7 @@ spec: ...@@ -30,7 +21,7 @@ spec:
- name: logs-persistent-storage - name: logs-persistent-storage
mountPath: /opt/airflow/logs mountPath: /opt/airflow/logs
hostNetwork: false hostNetwork: false
restartPolicy: Never restartPolicy: IfNotPresent
securityContext: securityContext:
runAsUser: 50000 runAsUser: 50000
nodeSelector: { } nodeSelector: { }
......
...@@ -3,6 +3,7 @@ kubectl apply -f airflow-rbac.yaml ...@@ -3,6 +3,7 @@ kubectl apply -f airflow-rbac.yaml
kubectl apply -f postgres-deployment.yaml kubectl apply -f postgres-deployment.yaml
kubectl apply -f postgres-service.yaml kubectl apply -f postgres-service.yaml
kubectl apply -f airflow-envvars-configmap.yaml kubectl apply -f airflow-envvars-configmap.yaml
kubectl apply -f airflow-secrets.yaml
kubectl apply -f airflow-webserver-deployment.yaml kubectl apply -f airflow-webserver-deployment.yaml
kubectl apply -f airflow-webserver-service.yaml kubectl apply -f airflow-webserver-service.yaml
kubectl apply -f airflow-scheduler-deployment.yaml kubectl apply -f airflow-scheduler-deployment.yaml
......
kubectl delete -f airflow-rbac.yaml kubectl delete -f airflow-rbac.yaml
kubectl delete -f postgres-service.yaml kubectl delete -f postgres-service.yaml
kubectl delete -f postgres-deployment.yaml kubectl delete -f postgres-deployment.yaml
kubectl delete -f airflow-secrets.yaml
kubectl delete -f airflow-envvars-configmap.yaml kubectl delete -f airflow-envvars-configmap.yaml
kubectl delete -f airflow-webserver-service.yaml kubectl delete -f airflow-webserver-service.yaml
kubectl delete -f airflow-webserver-deployment.yaml kubectl delete -f airflow-webserver-deployment.yaml
......
...@@ -16,18 +16,29 @@ spec: ...@@ -16,18 +16,29 @@ spec:
spec: spec:
containers: containers:
- args: - args:
- while true; aws s3 sync --exact-timestamps --delete 's3://prueba1234568/dags' '/dags'; do sleep 30; done; - mc alias set minio ${MINIO_SERVER:-http://192.168.49.2:9000} ${MINIO_USER:-minioadmin}
${MINIO_PASSWORD:-minioadmin}; while true; mc mirror --remove --overwrite minio${MINIO_DAGS_DIR:-/prueba-ca/dags} /dags;
do sleep ${SYNCHRONYZE_DAG_DIR:-30}; done;
command: command:
- /bin/bash - /bin/bash
- -c - -c
- -- - --
name: sync-dags name: sync-dags-minio
image: amazon/aws-cli:2.1.34 image: minio/mc
envFrom:
- configMapRef:
name: airflow-envvars-configmap
env: env:
- name: AWS_ACCESS_KEY_ID - name: MINIO_USER
value: AKIAQAAMXO3Z4BHNKEIE valueFrom:
- name: AWS_SECRET_ACCESS_KEY secretKeyRef:
value: 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 key: MINIO_USER
name: credentials
- name: MINIO_PASSWORD
valueFrom:
secretKeyRef:
key: MINIO_PASSWORD
name: credentials
volumeMounts: volumeMounts:
- name: dags-host-volume - name: dags-host-volume
mountPath: /dags mountPath: /dags
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment