Commit 8dff47d1 authored by Cristian Aguirre's avatar Cristian Aguirre

Update 25-07-23. New parameter in config: cloud_provider

parent 56c7f2a2
......@@ -140,7 +140,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
logger.info("Guardado correctamente todos los datos")
source_conn.close_basic_connection()
else:
if command.replace(" ", "").lower().find("|select"):
if command.replace(" ", "").lower().find("|select") != -1:
command = command[command.find("select"):]
steps = get_steps(command, chunksize, source_engine)
# Traemos el iterator
......
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from enums.ProviderTypeEnum import ProviderTypeEnum
import logging
......@@ -10,19 +13,29 @@ TIMEOUT = 60*1
VERIFY_SSL = False
def create_s3_sensor(task_id: str, connection: str, bucket: str, key: str) -> S3KeySensor:
s3_sensor = None
def create_sensor(task_id: str, connection: str, bucket: str, key: str, provider: str = "google"):
sensor = None
try:
s3_sensor = S3KeySensor(
task_id=task_id,
bucket_key=key,
bucket_name=bucket,
wildcard_match=True,
aws_conn_id=connection,
verify=VERIFY_SSL,
poke_interval=POKE_INTERVAL,
timeout=TIMEOUT
)
if provider == ProviderTypeEnum.GOOGLE.value:
sensor = GCSObjectExistenceSensor(
task_id=task_id,
bucket=bucket,
object=key,
google_cloud_conn_id=connection,
poke_interval=POKE_INTERVAL,
timeout=TIMEOUT
)
else:
sensor = S3KeySensor(
task_id=task_id,
bucket_key=key,
bucket_name=bucket,
wildcard_match=True,
aws_conn_id=connection,
verify=VERIFY_SSL,
poke_interval=POKE_INTERVAL,
timeout=TIMEOUT
)
except Exception as e:
logger.error(f"Error creando Sensor S3. {e}")
return s3_sensor
return sensor
......@@ -3,38 +3,39 @@ app:
database:
sources:
source1:
type: oracle
host: 192.168.27.22
port: 21521
username: PRUEBABCOM2
password: admin
database: bd_tp_qa
type: postgres
host: airflow
port: 5432
username: airflow
password: airflow
database: postgres
service: ORCLPDB1
schema: public
schema: sources
transformation:
type: mysql
host: 192.168.1.9
port: 13306
username: root
password: root
database: prueba_bcom
type: postgres
host: airflow
port: 5432
username: airflow
password: airflow
database: postgres
service:
schema:
schema: intern_db
chunksize: 8000
label_multiple_select: TABLENAME
source_mask: select # Sufijo (S)
procedure_mask: procedure # S
transformation_mask: transform # S
prefix_order_delimiter: .
cloud_provider: google
scripts:
s3_params:
bucket: prueba1234568
bucket: prueba-airflow
prefix: bcom_scripts
connection_id: conn_script
control:
s3_params:
connection_id: conn_script
bucket: prueba1234568
bucket: prueba-airflow
prefix: bcom_control
filename: control_example.json
timezone: 'GMT-5'
......@@ -45,7 +46,7 @@ app:
delimiter: '|'
tmp_path: /tmp
s3_params:
bucket: prueba1234568
bucket: prueba-airflow
prefix: bcom_results
connection_id: conn_script
......@@ -9,7 +9,7 @@ from airflow.utils.task_group import TaskGroup
from components.Utils import update_sql_commands
from components.Xcom import save_commands_to_xcom
from components.S3Route import get_files_from_prefix, get_file_from_key
from components.Sensor import create_s3_sensor
from components.Sensor import create_sensor
from components.Extractor import get_extract_task_group
from components.Transformation import get_transform_task_group
from components.Generation import get_generate_task_group
......@@ -26,7 +26,7 @@ DAG_NAME = "BCOM_DAG_TRANSFORMACIONES3"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/opt/airflow/dags/"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition.json"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition2.json"
DEFAULT_ARGS = {
'owner': 'BCOM',
......@@ -149,8 +149,8 @@ def set_dag():
wildcard_scripts = scripts_s3["prefix"] + "?*"
else:
wildcard_scripts = scripts_s3["prefix"] + "/?*"
sensor_scripts = create_s3_sensor("SCRIPTS-SENSOR", scripts_s3["connection_id"], scripts_s3["bucket"],
wildcard_scripts)
sensor_scripts = create_sensor("SCRIPTS-SENSOR", scripts_s3["connection_id"], scripts_s3["bucket"],
wildcard_scripts, conf["cloud_provider"])
control_s3 = conf["control"]["s3_params"]
# Scripts extraction
extract_mask = conf["source_mask"]
......
......@@ -6,7 +6,7 @@ import pandas as pd
import numpy as np
from components.S3Route import get_df_from_s3, get_base_date, save_df_to_s3, move_object_s3
from components.Sensor import create_s3_sensor
from components.Sensor import create_sensor
from components.Utils import get_modified_prefix, remove_invalid_rows, remove_fields, update_dict_with_catalogs
from airflow import DAG
......@@ -256,26 +256,26 @@ def set_dag_1():
catalogs_dict = update_dict_with_catalogs(catalogs_dict, conf, "no_promocion", s3_prefix)
# Define the sensor to verify if data exists or have been updated
s3_sensor_tacom = create_s3_sensor("S3_sensor_tacom_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
s3_tacom)
s3_sensor_tacom = create_sensor("S3_sensor_tacom_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
s3_tacom)
s3_sensor_promo = create_s3_sensor("S3_sensor_promo_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
s3_promo)
s3_sensor_promo = create_sensor("S3_sensor_promo_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
s3_promo)
s3_sensor_promo_catalog = create_s3_sensor("S3_sensor_promo_catalog_task", s3_conf["s3_conn_id"],
s3_conf["bucket"], catalogs_dict["s3_catalogo_promociones"])
s3_sensor_promo_catalog = create_sensor("S3_sensor_promo_catalog_task", s3_conf["s3_conn_id"],
s3_conf["bucket"], catalogs_dict["s3_catalogo_promociones"])
s3_sensor_3a2p = create_s3_sensor("S3_sensor_3a2p_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacion3pa2p"])
s3_sensor_3a2p = create_sensor("S3_sensor_3a2p_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacion3pa2p"])
s3_sensor_poid = create_s3_sensor("S3_sensor_poid_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacionpoidpaquete"])
s3_sensor_poid = create_sensor("S3_sensor_poid_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacionpoidpaquete"])
s3_sensor_paq = create_s3_sensor("S3_sensor_paq_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacion_paquetes"])
s3_sensor_paq = create_sensor("S3_sensor_paq_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_relacion_paquetes"])
s3_sensor_notpromo = create_s3_sensor("S3_sensor_notpromo_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_no_promocion"])
s3_sensor_notpromo = create_sensor("S3_sensor_notpromo_task", s3_conf["s3_conn_id"], s3_conf["bucket"],
catalogs_dict["s3_no_promocion"])
outputs = conf["s3_parameters"]["outputs"]
output_prefix = outputs["prefix"]
......
from enum import Enum
class ProviderTypeEnum(Enum):
GOOGLE = "google"
AMAZON = "aws"
MINIO = "local"
[
{
"identifier": "TABLA1",
"fields": [
{
"name": "columna1",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "columna2",
"datatype": "NUMBER"
},
{
"name": "columna3",
"datatype": "BOOLEAN"
},
{
"name": "columna4",
"datatype": "NUMBER"
},
{
"name": "columna5",
"datatype": "DECIMAL"
}
]
}
]
\ No newline at end of file
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-sync-dags
namespace: bcom-airflow
spec:
selector:
matchLabels:
app: airflow-sync-dags
template:
metadata:
labels:
app: airflow-sync-dags
spec:
containers:
- args:
- while true; gcloud rsync -d -r ${GCS_DAGS_DIR:-gs://prueba-rsync/carpeta} /dags;
do sleep ${SYNCHRONYZE_DAG_DIR:-30}; done;
command:
- /bin/bash
- -c
- --
name: sync-dags-gcloud
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:alpine
envFrom:
- configMapRef:
name: airflow-envvars-configmap
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
key: AWS_ACCESS_KEY
name: credentials
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
key: AWS_SECRET_KEY
name: credentials
volumeMounts:
- name: dags-host-volume
mountPath: /dags
volumes:
- name: dags-host-volume
persistentVolumeClaim:
claimName: airflow-dags-pvc
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment