Commit dc90b617 authored by Cristian Aguirre's avatar Cristian Aguirre

Update DAG-TACOMVENTAS-PROMOCIONESRESIDENCIAL-06-06-23

parent b3a54460
# bcom-tp-etl-transformation-pipelines
# Proyecto BCOM TRANSFORMACIONES
## Guía de despliegue y uso de los DAG's con Airflow y Kubernetes
### Requisitos previos
- Tener instalado Dockerfile, en su versión más reciente
- Tener un repositorio público en DockerHub o un repositorio local de imágenes
- Tener instalado Kubernetes (k8) y kubectl
- (Opcional) Para ambiente de desarrollo, se recomienda instalar "Minikube" en vez de k8, la cual
simula un clúster de k8 dentro de un contenedor.
- Tener instalado el Minio Server en local o utilizar un AWS S3
## Despliegue de ambiente:
⚠️ *Los 2 siguientes pasos solo se deben realizar la primera vez. Y solo será
necesario repetirlo si el archivo Dockerfile_worker cambia y esto sucede si se necesitan
más librerías externa para ejecutar los DAGs*
1.- Situarse en la carpeta **deploy-k8** y luego crear la imagen customizada de Airflow
más las dependencias necesarias para ejecutar nuestro DAG. Por ejemplo, ejecutar el siguiente comando:
```shell
docker build -t airflow_custom:1.0.0 -f Dockerfile_worker .
```
2.- Ahora subimos nuestra imagen al DockerHub o repositorio local, para esto primero
nos debemos loguear, crear el tag y realizar el push. Por ejemplo:
```shell
docker login
docker tag airflow_custom:1.0.0 <nombrecuenta>/<repositorio>:<tag>
docker push <nombrecuenta>/<repositorio>:<tag>
```
3.- Con la imagen ya subida a un repositorio, colocamos este nombre en el archivo
**airflow-envvars-configmap.yaml**, además que aprovechamos en actualizar y validar
todas las configuraciones presentes en dicho archivo y también en nuestro
archivo de Secrets **airflow-secrets.yamñl**
```text
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY=<cuenta/SERVER-URL>/<repositorio>
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG="<tag>"
```
NOTA: *Detalle de cada variable de entorno usada por los POD y Airflow:*
```text
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: '30' # Intervalo de tiempo en ir a buscar nuevos dags en la carpeta de dags
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO # Nivel de log de Airflow (webserver, scheduler y workers)
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: America/Lima # Timezone de la Web de Airflow
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima # Timezone del Scheduler de Airflow
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}' # Tiempo de espera de respuesta de Kubernetes API
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom # Ruta de imagen a usar para crear el worker POD
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.1" # Tag a usar para crear el worker POD
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc # Nombre del volumen usado para almacenar los logs
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF: airflow-envvars-configmap # Nombre del configMap donde estan defenidos las variables de entorno
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE: /opt/airflow/templates/pod_template.yaml # Ruta del host donde esta el template para iniciar el worker
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor # Tipo de Executor a usar por Airflow
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow # Ruta de conexión por default de Airflow con Postgres
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' # Pausar (que no esten iniciados) los DAG's cuando se creen
AIRFLOW__CORE__LOAD_EXAMPLES: 'true' # Colocar los DAG's de ejemplo en la web
_AIRFLOW_DB_UPGRADE: 'true' # Actualizar la BD si es necesario
_AIRFLOW_WWW_USER_CREATE: 'true' # Crear usuario inicial para usar la web
_AIRFLOW_WWW_USER_USERNAME: admin # Username del usuario de la web
_AIRFLOW_WWW_USER_PASSWORD: admin # Contraseña del usuario de la web
S3_DAGS_DIR: 's3://prueba1234568/dags' # Ruta del S3 (si en caso se usa S3) donde estan alojados los DAG's a sincronizar
SYNCHRONYZE_DAG_DIR: '30' # Tiempo (en segundos) para ir a buscar actualizaciones en la carpeta de dags
MINIO_SERVER: 'http://192.168.49.2:9000' # Ruta del Minio Server (Si en caso se usa Minio)
MINIO_DAGS_DIR: '/prueba-ca/dags' # Ubicación de los DAG's dentro del bucket de Minio
```
Adicionalmente, validar o actualizar los valores del archivo **airflow-secrets.yaml** (recordar que
tiene encriptación con método **"base64"**)
```text
AWS_ACCESS_KEY: bWluaW9hZG1pbg== # ACCESS_KEY de AWS si en caso se usa bucket S3 para guardar los DAG's
AWS_SECRET_KEY: bWluaW9hZG1pbg== # SECRET_KEY de AWS si en caso se usa bucket S3 para guardar los DAG's
MINIO_USER: bWluaW9hZG1pbg== # Usuario de Minio para conectarse al bucket del Minio Server
MINIO_PASSWORD: bWluaW9hZG1pbg== # Contraseña de Minio para conectarse al bucket del Minio Server
```
4.- Colocar el archivo **pod_template.yaml** en la ruta del host /opt/airflow/templates/.
*Nota:* Si en caso se este usando Minikube, entonces el archivo debe estar dentro del contenedor
en la misma ruta, para eso se ejecuta el siguiente comando:
```shell
docker cp pod_template.yaml <ID_CONTAINER>:/opt/airflow/templates/
```
Y aprovechando que estamos en el directorio, creamos la carpeta con la siguiente ruta: */opt/airflow/dags/dags/*
5.- Ejecutar el script **script_apply.sh** para crear todos los configsMaps, volúmenes, secrets, deployments
y servicios para correr Airflow.
```shell
sh script_apply.sh
```
Con este comando, tiene que esperar unos minutos para que todos los POD's esten levantados y corriendo
normalmente
6.- (OPCIONAL) Si en caso a deployado usando Minikube, será necesario exponer el puerto del Airflow Webserver
para que desde su local puede ingresar a la web. Dado que el puerto expuesto es dentro del contenedor del
minikube, debe salir ahora a su local con el siguiente comando:
```shell
kubectl port-forward <ID-POD-AIRFLOW-WEBSERVER> 8081:8080
```
Ahora desde su navegador puede ingresar la ruta http://localhost:8081 para ver la Web de Airflow
7.- Validamos que nuestros POD's están corriendo en estado "Running" con el siguiente comando:
```shell
kubectl get pods
```
## Agregar o actualizar DAG's en Airflow
Para actualizar el DAG ya existente de **TACOMVENTAS Y PROMOCIONES RESIDENCIAL** o agregar un nuevo
DAG en este despliegue de Airflow, se siguen los siguientes pasos:
1.- Verificar que nuestro ambiente en Kubernetes está levantado correctamente. Ejecutamos el comando:
```shell
kubectl get pods
```
2.- Los archivos actualizados del dag colocarlos en el bucket y prefijo correspondiente de **dags**.
Se deben colocar en la ruta configurada en la variable de entorno del archivo **airflow-envvars-configmap.yaml**.
Si usamos S3, sería en la variable _S3_DAGS_DIR_, si fuera Minio se usará _MINIO_DAGS_DIR_.
La variable muestra la url completa <bucket>/<prefijo>. Por ejemplo:
```text
S3_DAGS_DIR: 's3://prueba1234568/dags'
MINIO_DAGS_DIR: '/prueba-ca/dags'
```
3.- Solo quedaría esperar n segundos, de acuerdo a las variables _SYNCHRONYZE_DAG_DIR_ (el POD sync
sincronizará bucket con la carpeta local cada n segundos) y _AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL_
(el scheduler Airflow irá a buscar actualizaciones en su carpeta dags cada n segundos). Luego del tiempo
establecido se podrá ver los cambios en la web de Airflow.
## Ejecutar DAG TACOMVENTAS y PROMOCIONES_RESIDENCIAL
_Requisitos para ejecutar el DAG:_
- El ambiente este desplegado correctamente y los POD's en estado "Running"
- Crear la conexión en la web de Airflow con el nombre que está configurado en app_conf.yml con
parámetro **s3_conn_id**.
- Validar todas las rutas (bucket y prefijos) y configuraciones de los 7 insumos en app_conf.yml.
1.- En la web de Airflow, ubicamos nuestro DAG con nombre: **"BCOM_DAG_TRANSFORMACION_TACOMVENTAS_PROMOCIONESRESIDENCIAL"**
(configurado como constante en dag_transformacion_tacomventas_promoresidencial.py) y lo inicializamos
por medio de su checkbox.
2.- De acuerdo a la configuración actual, el DAG se ejecutará cada 2 horas (configurado en app_conf.yml con parámetro
**schedule**).
......@@ -6,7 +6,7 @@ general:
dags:
dag1:
schedule: "@once"
schedule: "0 */2 * * *"
period_pattern: '[a-zA-Z0-9]+([0-9]{4})(\-[0-9]{2})?\.[a-zA-Z]*'
csv_delimiter: ";"
filters:
......
......@@ -12,7 +12,6 @@ data:
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}'
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.1"
AIRFLOW__KUBERNETES__DAGS_VOLUME_HOST: /mnt/airflow/dags
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF: airflow-envvars-configmap
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE: /opt/airflow/templates/pod_template.yaml
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment