Commit 6e679176 authored by Erly Villaroel's avatar Erly Villaroel

Cambio de archivo de configuracion, se agrego ruta al archivo de definicion

parent 48029b5d
......@@ -6,20 +6,20 @@ app:
sources:
source1:
type: mysql
host: database-11.cluster-ro-cvsz4ey9eiec.us-east-1.rds.amazonaws.com
port: 3306
username: admin
password: adminadmin
database: prueba_ca_1
service: ORCLPDB1
host: 192.168.1.13
port: 13306
username: root
password: root
database: prueba
service:
schema: sources
transformation:
type: mysql
host: 192.168.1.2
host: 192.168.1.13
port: 13306
username: root
password: root
database: prueba_bcom2
database: prueba_ca
service:
schema: intern_db
chunksize: 4000
......@@ -31,13 +31,13 @@ app:
cloud_provider: aws
scripts:
s3_params:
bucket: prueba1234568
bucket: pruebairflow
prefix: bcom_scripts
connection_id: conn_script
connection_id: prueba_af
control:
s3_params:
connection_id: conn_script
bucket: prueba1234568
connection_id: prueba_af
bucket: pruebairflow
prefix: bcom_control
filename: control_<period>.json
timezone: 'GMT-5'
......@@ -48,13 +48,15 @@ app:
delimiter: '|'
tmp_path: /tmp
s3_params:
bucket: prueba1234568
bucket: pruebairflow
prefix: bcom_results
connection_id: conn_script
connection_id: prueba_af
report:
s3_params:
bucket: prueba1234568
bucket: pruebairflow
prefix: bcom_report
connection_id: conn_script
connection_id: prueba_af
filename: report_<datetime>.xlsx
datetime_pattern: '%Y-%m-%d %H:%M:%S'
procedure:
filepath: "procedure_prueba.json"
\ No newline at end of file
......@@ -20,7 +20,7 @@ logger = logging.getLogger()
DAG_NAME = "RESET_PROCESS"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
MAIN_PATH = "/opt/airflow/dags/"
DEFAULT_ARGS = {
'owner': 'BCOM',
......
......@@ -25,8 +25,8 @@ logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition2.json"
MAIN_PATH = "/opt/airflow/dags/"
#JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition.json"
DEFAULT_ARGS = {
'owner': 'BCOM',
......@@ -83,7 +83,7 @@ def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, An
def save_procedure_json(json_path: str, task) -> None:
try:
with open(json_path) as f:
with open("/opt/airflow/dags/"+ json_path) as f:
data = json.load(f)
logger.info(f"JSON-PROCEDURE {data}")
if data:
......@@ -113,13 +113,15 @@ def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str,
logger.error(f"Error general de descarga de archivo de control. {e}")
#"**kwargs" dice que pueden haber más parámetros
def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str,
order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any],
provider: str, timezone: str, **kwargs):
provider: str, timezone: str, procedure_filepath: str, **kwargs):
try:
extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"],
kwargs['ti'], provider, timezone)
save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti'])
# save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti'])
save_procedure_json(procedure_filepath, kwargs['ti'])
start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
scripts = get_files_from_prefix(conn_id, bucket, prefix, provider)
......@@ -155,6 +157,8 @@ def set_dag():
wildcard_scripts, conf["cloud_provider"])
control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"]
# NUEVA VARIABLE
procedure = conf["procedure"]
# Scripts extraction
extract_mask = conf["source_mask"]
transform_mask = conf["transformation_mask"]
......@@ -167,7 +171,8 @@ def set_dag():
'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask,
'procedure_mask': procedure_mask, 'order_delimiter': order_delimiter,
'label_tablename': conf["label_multiple_select"], 'control_params': control_s3,
'provider': conf["cloud_provider"], 'timezone': timezone},
'provider': conf["cloud_provider"], 'timezone': timezone,
'procedure_filepath': procedure["filepath"]},
trigger_rule="all_success"
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment