Commit 0e639513 authored by Cristian Aguirre's avatar Cristian Aguirre

Update 01-08-23. Fix some bugs (377, 378)

parent 03b3285a
......@@ -456,8 +456,36 @@ Listo. Con esto se ha configurado todos los servicios requeridos para levantar n
### Desplegando Airflow con GKE
⚠️ Ojo: Antes de desplegar nuestros componentes, hay que asegurarnos que cumple con los
recursos de nuestro clúster (en cuestión de vCPU y memoria RAM), para esto hay que validar
los templates que son de tipo despliegue (deployment), estos son los que en su nombre termina
con la palabra _"deployment"_: "airflow-scheduler-deployment,yaml", "airflow-webserver-deployment.yaml",
"postgres-deployment.yaml" y "sync-dags-deployment-gcs" (para este caso que estamos en GCP).
1.-
Además del archivo "airflow-envvars-configmap.yaml" que contiene los recursos para los workers.
Todos los templates mencionados tienen sus parámetros "containers.resources.limits" o
"containers.resources.requests" las cuales indican el máximo y mínimo, respectivamente.
1.- Luego de tener ya configurado nuestro Bucket y NFS (Filestore), podemos desplegar nuestro sistema
Airflow con Kubernetes. Para esto nos situamos dentro de la carpeta **"deploy-k8"** y ejecutamos
el siguiente comando:
```shell
sh script-apply.sh
```
Esto arrancará todos los componentes y dentro de unos segundos o minutos estará todo arriba,
listo para su uso.
2.- Para validar que están todos los componentes arriba, se ejecuta el siguiente comando:
```shell
kubectl get pods -n bcom-airflow
```
Deberá mostrarse algo similar a esto:
![image](readme-images/componentes-ready1.png)
......@@ -4,6 +4,7 @@ import json
import numpy as np
import pandas as pd
from enums.ProcessStatusEnum import ProcessStatusEnum
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.Utils import select_multiple, generateModel
from components.DatabaseOperation.DatabaseExtraction import get_iterator, get_steps
from components.DatabaseOperation.DatabaseLoad import save_from_dataframe
......
......@@ -8,6 +8,7 @@ from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from enums.ProcessStatusEnum import ProcessStatusEnum
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.S3Route import save_df_to_s3, load_control_to_s3
from components.Utils import select_multiple, create_temp_file, delete_temp_dir
from components.Control import get_tasks_from_control, update_new_process
......
......@@ -4,6 +4,7 @@ from typing import Any, Dict, List, Tuple
import pytz
from io import BytesIO, StringIO
import pandas as pd
import re
from components.Utils import get_type_file
from enums.FileTypeEnum import FileTypeEnum
......@@ -181,6 +182,7 @@ def get_file_from_prefix(conn: str, bucket: str, key: str, provider: str, timezo
frequency: str = "montly") -> Any:
result, key_result = BytesIO(), ''
try:
format_re_pattern = '[0-9]{4}-[0-9]{2}\.'
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
period = str(datetime_by_tzone(timezone, format_date))[:7]
logger.info(f"Periodo actual: {period}.")
......@@ -197,7 +199,7 @@ def get_file_from_prefix(conn: str, bucket: str, key: str, provider: str, timezo
files = gcp_hook.list(bucket, prefix=key)
files_with_period = []
for file in files:
if file.endswith("/"):
if file.endswith("/") or not re.search(format_re_pattern, file):
continue
file_period = file[file.rfind("_") + 1:file.rfind(".")]
files_with_period.append((file, file_period))
......
......@@ -107,7 +107,7 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
final_data[-1] = final_data[-1] + "; end;"
final_item = item
if item.lower().strip().find(label_tablename.lower().strip() + ":") != -1:
init_index = item.lower().strip().index(label_tablename.lower().strip() + ":")
init_index = item.replace(" ", "").lower().strip().index(label_tablename.lower().strip() + ":")
table_name = item.replace(" ", "").strip()[init_index + len(label_tablename + ":"):].strip()
add_next = True
elif item != "":
......
......@@ -10,7 +10,7 @@ app:
port: 3306
username: admin
password: adminadmin
database: prueba_bcom
database: prueba_ca_1
service: ORCLPDB1
schema: sources
transformation:
......@@ -19,7 +19,7 @@ app:
port: 3306
username: admin
password: adminadmin
database: prueba_bcom2
database: prueba_ca_2
service:
schema: intern_db
chunksize: 8000
......@@ -28,16 +28,16 @@ app:
procedure_mask: procedure # S
transformation_mask: transform # S
prefix_order_delimiter: .
cloud_provider: google
cloud_provider: aws
scripts:
s3_params:
bucket: prueba-airflow3
bucket: prueba-airflow13
prefix: bcom_scripts
connection_id: conn_script
control:
s3_params:
connection_id: conn_script
bucket: prueba-airflow3
bucket: prueba1234568
prefix: bcom_control
filename: control_<period>.json
timezone: 'GMT-5'
......@@ -48,12 +48,12 @@ app:
delimiter: '|'
tmp_path: /tmp
s3_params:
bucket: prueba-airflow3
bucket: prueba-airflow13
prefix: bcom_results
connection_id: conn_script
report:
s3_params:
bucket: prueba-airflow3
bucket: prueba1234568
prefix: bcom_report
connection_id: conn_script
filename: report_<datetime>.xlsx
......
......@@ -76,7 +76,7 @@ def create_report(tmp_path: str, **kwargs) -> None:
title_format.set_font_size(20)
title_format.set_font_color("#333333")
header = f"Reporte ejecutado el día {execution_date}"
header = f"Proceso ejecutado el día {execution_date}"
if status == ProcessStatusEnum.SUCCESS.value:
status = "EXITOSO"
elif status == ProcessStatusEnum.FAIL.value:
......@@ -95,20 +95,20 @@ def create_report(tmp_path: str, **kwargs) -> None:
row_format = workbook.add_format()
row_format.set_font_size(8)
row_format.set_font_color("#000000")
base_index = 5
for index, key in enumerate(data.keys()):
index = base_index + index
worksheet.merge_range('A'+str(index)+':B'+str(index), key, row_format)
if data[key]["TYPE"] == "EXTRACTION":
worksheet.merge_range('C'+str(index)+':G'+str(index), f"TABLA DE EXTRACCIÓN: {data[key]['DESCRIPTION']}", row_format)
elif data[key]["TYPE"] == "TRANSFORMATION":
script = data[key]["DESCRIPTION"].split("|")[1]
worksheet.merge_range('C'+str(index)+':G'+str(index), f"SCRIPT DE TRANSFORMACIÓN: {script}", row_format)
elif data[key]["TYPE"] == "GENERATION":
worksheet.merge_range('C'+str(index)+':G'+str(index), f"ARCHIVO GENERADO DESDE LA TABLA: {data[key]['DESCRIPTION']}", row_format)
worksheet.merge_range('H'+str(index)+':I'+str(index), f"ESTADO: {data[key]['STATUS']}", row_format)
worksheet.merge_range('J'+str(index)+':N'+str(index), data[key]['MESSAGE'], row_format)
if status != ProcessStatusEnum.RESET.value:
base_index = 5
for index, key in enumerate(data.keys()):
index = base_index + index
worksheet.merge_range('A'+str(index)+':B'+str(index), key, row_format)
if data[key]["TYPE"] == "EXTRACTION":
worksheet.merge_range('C'+str(index)+':G'+str(index), f"TABLA DE EXTRACCIÓN: {data[key]['DESCRIPTION']}", row_format)
elif data[key]["TYPE"] == "TRANSFORMATION":
script = data[key]["DESCRIPTION"].split("|")[1]
worksheet.merge_range('C'+str(index)+':G'+str(index), f"SCRIPT DE TRANSFORMACIÓN: {script}", row_format)
elif data[key]["TYPE"] == "GENERATION":
worksheet.merge_range('C'+str(index)+':G'+str(index), f"ARCHIVO GENERADO DESDE LA TABLA: {data[key]['DESCRIPTION']}", row_format)
worksheet.merge_range('H'+str(index)+':I'+str(index), f"ESTADO: {data[key]['STATUS']}", row_format)
worksheet.merge_range('J'+str(index)+':N'+str(index), data[key]['MESSAGE'], row_format)
task.xcom_push(key="REPORT_PATH", value=excel_tmp_path)
except Exception as e:
logger.error(f"Error creando reporte. {e}")
......@@ -125,7 +125,8 @@ def get_data_report(**kwargs) -> None:
else:
last_process = control[-1]
if "reset_by_user" in last_process.keys():
report_data["PROCESS_EXECUTION"] = ProcessStatusEnum.RESET.value
report_data["PROCESS_STATUS"] = ProcessStatusEnum.RESET.value
report_data["PROCESS_EXECUTION"] = last_process["date"]
else:
total_tasks = [last_process["tasks"]]
current_status = last_process["status"]
......@@ -161,7 +162,7 @@ def get_data_report(**kwargs) -> None:
report_data.update({item: {"STATUS": final_key_tasks[item], "TYPE": type_task,
"DESCRIPTION": final_key_desc[item], 'MESSAGE': final_key_message[item]}})
report_data.update({"PROCESS_STATUS": current_status, "PROCESS_EXECUTION": last_process["date"]})
task.xcom_push(key="REPORT-DATA", value=report_data)
task.xcom_push(key="REPORT-DATA", value=report_data)
logger.info(f"Diccionario de datos para el reporte: {report_data}")
except Exception as e:
logger.error(f"Error general creando reporte. {e}")
......
apiVersion: v1
kind: Namespace
metadata:
name: bcom-airflow
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: bcom-airflow
namespace: bcom-airflow
---
#apiVersion: v1
#kind: Namespace
#metadata:
# name: bcom-airflow
#
#---
#apiVersion: v1
#kind: ServiceAccount
#metadata:
# name: bcom-airflow
# namespace: bcom-airflow
#
#---
apiVersion: v1
kind: ConfigMap
......@@ -36,6 +36,10 @@ data:
value: LocalExecutor
image: dumy-image
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: "1000m"
memory: "2Gi"
name: base
volumeMounts:
- name: dags-host-volume
......@@ -77,19 +81,20 @@ data:
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}'
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.5"
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.6"
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF: airflow-envvars-configmap
AIRFLOW__KUBERNETES_EXECUTOR__POD_TEMPLATE_FILE: /opt/airflow/templates/pod_template.yaml
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: admin
_AIRFLOW_WWW_USER_PASSWORD: admin
S3_DAGS_DIR: 's3://prueba1234568/dags'
GCS_DAGS_DIR: 'gs://prueba-rsync2/carpeta'
SYNCHRONYZE_DAG_DIR: '30'
MINIO_SERVER: 'http://192.168.49.2:9000'
MINIO_DAGS_DIR: '/prueba-ca/dags'
\ No newline at end of file
......@@ -22,6 +22,10 @@ spec:
containers:
- name: airflow-scheduler
image: cristianfernando/airflow_custom:0.0.4
resources:
requests:
cpu: "1000m"
memory: "4Gi"
args: ["scheduler"]
envFrom:
- configMapRef:
......
......@@ -5,13 +5,13 @@ metadata:
namespace: bcom-airflow
spec:
capacity:
storage: 300Mi
storage: 5Gi
accessModes:
- ReadWriteMany
storageClassName: airflow-dags
nfs:
server: 192.168.1.9
path: "/mnt/nfs_share"
server: 10.216.137.186
path: "/volume1/nfs_share"
---
......@@ -22,13 +22,13 @@ metadata:
namespace: bcom-airflow
spec:
capacity:
storage: 8000Mi
storage: 16Gi
accessModes:
- ReadWriteMany
storageClassName: airflow-postgres
nfs:
server: 192.168.1.9
path: "/mnt/nfs_postgres"
server: 10.216.137.186
path: "/volume1/nfs_postgres"
---
......@@ -39,13 +39,13 @@ metadata:
namespace: bcom-airflow
spec:
capacity:
storage: 4000Mi
storage: 10Gi
accessModes:
- ReadWriteMany
storageClassName: airflow-logs
nfs:
server: 192.168.1.9
path: "/mnt/nfs_logs"
server: 10.216.137.186
path: "/volume1/nfs_logs"
---
......@@ -60,7 +60,7 @@ spec:
storageClassName: airflow-dags
resources:
requests:
storage: 200Mi
storage: 5Gi
---
......@@ -75,7 +75,7 @@ spec:
storageClassName: airflow-postgres
resources:
requests:
storage: 7500Mi
storage: 16Gi
---
......@@ -91,5 +91,5 @@ spec:
- ReadWriteMany
resources:
requests:
storage: 3500Mi
storage: 10Gi
storageClassName: airflow-logs
\ No newline at end of file
......@@ -22,6 +22,10 @@ spec:
containers:
- name: airflow-webserver
image: apache/airflow:2.5.3
resources:
requests:
cpu: "500m"
memory: "500Mi"
args: ["webserver"]
envFrom:
- configMapRef:
......
......@@ -21,8 +21,8 @@ spec:
image: postgres:12
resources:
limits:
memory: 128Mi
cpu: 500m
memory: "2Gi"
cpu: "500m"
ports:
- containerPort: 5432
env:
......
......@@ -7,4 +7,4 @@ kubectl apply -f airflow-secrets.yaml
kubectl apply -f airflow-webserver-deployment.yaml
kubectl apply -f airflow-webserver-service.yaml
kubectl apply -f airflow-scheduler-deployment.yaml
kubectl apply -f sync-dags-deployment.yaml
kubectl apply -f sync-dags-deployment-gcs.yaml
......@@ -5,6 +5,6 @@ kubectl delete -f airflow-secrets.yaml
kubectl delete -f airflow-webserver-service.yaml
kubectl delete -f airflow-webserver-deployment.yaml
kubectl delete -f airflow-scheduler-deployment.yaml
kubectl delete -f sync-dags-deployment.yaml
kubectl delete -f sync-dags-deployment-gcs.yaml
kubectl delete -f airflow-volumes.yaml
kubectl delete -f airflow-envvars-configmap.yaml
\ No newline at end of file
......@@ -14,9 +14,12 @@ spec:
app: airflow-sync-dags
spec:
serviceAccountName: bcom-airflow
nodeSelector:
iam.gke.io/gke-metadata-server-enabled: "true"
containers:
- args:
- while true; gcloud rsync -d -r ${GCS_DAGS_DIR:-gs://prueba-rsync/carpeta} /dags;
- while true; gsutil rsync -d -r ${GCS_DAGS_DIR:-gs://prueba-rsync2/carpeta} /dags;
do sleep ${SYNCHRONYZE_DAG_DIR:-30}; done;
command:
- /bin/bash
......@@ -24,20 +27,13 @@ spec:
- --
name: sync-dags-gcloud
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:alpine
resources:
limits:
cpu: "250m"
memory: "1Gi"
envFrom:
- configMapRef:
name: airflow-envvars-configmap
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
key: AWS_ACCESS_KEY
name: credentials
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
key: AWS_SECRET_KEY
name: credentials
volumeMounts:
- name: dags-host-volume
mountPath: /dags
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment