Commit 941f6177 authored by Cristian Aguirre's avatar Cristian Aguirre

Update 07-08-23. Update schedule from DAG's

parent b1423b97
...@@ -20,7 +20,7 @@ logger = logging.getLogger() ...@@ -20,7 +20,7 @@ logger = logging.getLogger()
DAG_NAME = "INFORM_PROCESS" DAG_NAME = "INFORM_PROCESS"
# Change this path if is deployed in prod or dev # Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/" MAIN_PATH = "/opt/airflow/dags/"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'BCOM', 'owner': 'BCOM',
...@@ -181,7 +181,7 @@ def set_dag(): ...@@ -181,7 +181,7 @@ def set_dag():
logger.info(f"CONFIGURACIÓN: {data}") logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"] conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que informa del último proceso ejecutado", with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que informa del último proceso ejecutado",
schedule_interval=conf["inform_dag_schedule"], tags=["DAG BCOM - INFORM PROCESS"], catchup=True) as dag: schedule_interval=conf["inform_dag_schedule"], tags=["DAG BCOM - INFORM PROCESS"], catchup=False) as dag:
control_s3 = conf["control"]["s3_params"] control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"] timezone = conf["timezone"]
control_extractor = PythonOperator( control_extractor = PythonOperator(
......
...@@ -92,7 +92,7 @@ def set_dag(): ...@@ -92,7 +92,7 @@ def set_dag():
logger.info(f"CONFIGURACIÓN: {data}") logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"] conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que resetea el último proceso ejecutado", with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que resetea el último proceso ejecutado",
schedule_interval=conf["reset_dag_schedule"], tags=["DAG BCOM - RESET PROCESS"], catchup=True) as dag: schedule_interval=conf["reset_dag_schedule"], tags=["DAG BCOM - RESET PROCESS"], catchup=False) as dag:
control_s3 = conf["control"]["s3_params"] control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"] timezone = conf["timezone"]
control_extractor = PythonOperator( control_extractor = PythonOperator(
......
...@@ -25,7 +25,7 @@ logger = logging.getLogger() ...@@ -25,7 +25,7 @@ logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM" DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM"
# Change this path if is deployed in prod or dev # Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/" MAIN_PATH = "/opt/airflow/dags/"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'BCOM', 'owner': 'BCOM',
...@@ -143,7 +143,7 @@ def set_dag(): ...@@ -143,7 +143,7 @@ def set_dag():
logger.info(f"CONFIGURACIÓN: {data}") logger.info(f"CONFIGURACIÓN: {data}")
conf = data["app"] conf = data["app"]
with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que extrae y transforma", with DAG(DAG_NAME, default_args=DEFAULT_ARGS, description="Proceso que extrae y transforma",
schedule_interval=conf["schedule"], tags=["DAG BCOM - SQL TRANSFORMATIONS"], catchup=True) as dag: schedule_interval=conf["schedule"], tags=["DAG BCOM - SQL TRANSFORMATIONS"], catchup=False) as dag:
scripts_s3 = conf["scripts"]["s3_params"] scripts_s3 = conf["scripts"]["s3_params"]
if scripts_s3["prefix"].endswith("/"): if scripts_s3["prefix"].endswith("/"):
wildcard_scripts = scripts_s3["prefix"] + "?*" wildcard_scripts = scripts_s3["prefix"] + "?*"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment