Commit 70ea4a93 authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer-ev' into 'developer'

Developer ev

See merge request !6
parents a537dd53 1f68dfd1
...@@ -6,22 +6,22 @@ app: ...@@ -6,22 +6,22 @@ app:
sources: sources:
source1: source1:
type: mysql type: mysql
host: 192.168.21.52 host: 192.168.1.13
port: 13306 port: 13306
username: root username: root
password: root password: root
database: bcom_tp_res database: prueba
service: ORCLPDB1 service:
schema: sources schema: sources
transformation: transformation:
type: oracle type: mysql
host: 192.168.27.22 host: 192.168.1.13
port: 21521 port: 13306
username: RLQA_AIR username: root
password: RLQA_AIR99 password: root
database: database: prueba_ca
service: ORCLPDB1 service:
schema: schema: intern_db
chunksize: 4000 chunksize: 4000
label_multiple_select: TABLENAME label_multiple_select: TABLENAME
source_mask: select # Sufijo (S) source_mask: select # Sufijo (S)
...@@ -31,13 +31,13 @@ app: ...@@ -31,13 +31,13 @@ app:
cloud_provider: aws cloud_provider: aws
scripts: scripts:
s3_params: s3_params:
bucket: prueba1234568 bucket: pruebairflow
prefix: bcom_scripts prefix: bcom_scripts
connection_id: conn_script connection_id: prueba_af
control: control:
s3_params: s3_params:
connection_id: conn_script connection_id: prueba_af
bucket: prueba1234568 bucket: pruebairflow
prefix: bcom_control prefix: bcom_control
filename: control_<period>.json filename: control_<period>.json
timezone: 'GMT-5' timezone: 'GMT-5'
...@@ -48,13 +48,15 @@ app: ...@@ -48,13 +48,15 @@ app:
delimiter: '|' delimiter: '|'
tmp_path: /tmp tmp_path: /tmp
s3_params: s3_params:
bucket: prueba1234568 bucket: pruebairflow
prefix: bcom_results prefix: bcom_results
connection_id: conn_script connection_id: prueba_af
report: report:
s3_params: s3_params:
bucket: prueba1234568 bucket: pruebairflow
prefix: bcom_report prefix: bcom_report
connection_id: conn_script connection_id: prueba_af
filename: report_<datetime>.xlsx filename: report_<datetime>.xlsx
datetime_pattern: '%Y-%m-%d %H:%M:%S' datetime_pattern: '%Y-%m-%d %H:%M:%S'
procedure:
filepath: "procedure_prueba.json"
\ No newline at end of file
...@@ -20,7 +20,7 @@ logger = logging.getLogger() ...@@ -20,7 +20,7 @@ logger = logging.getLogger()
DAG_NAME = "RESET_PROCESS" DAG_NAME = "RESET_PROCESS"
# Change this path if is deployed in prod or dev # Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/" MAIN_PATH = "/opt/airflow/dags/"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'BCOM', 'owner': 'BCOM',
......
...@@ -25,8 +25,8 @@ logger = logging.getLogger() ...@@ -25,8 +25,8 @@ logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM" DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM"
# Change this path if is deployed in prod or dev # Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/" MAIN_PATH = "/opt/airflow/dags/"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition.json" #JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition.json"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'BCOM', 'owner': 'BCOM',
...@@ -83,7 +83,7 @@ def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, An ...@@ -83,7 +83,7 @@ def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, An
def save_procedure_json(json_path: str, task) -> None: def save_procedure_json(json_path: str, task) -> None:
try: try:
with open(json_path) as f: with open("/opt/airflow/dags/"+ json_path) as f:
data = json.load(f) data = json.load(f)
logger.info(f"JSON-PROCEDURE {data}") logger.info(f"JSON-PROCEDURE {data}")
if data: if data:
...@@ -113,13 +113,15 @@ def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str, ...@@ -113,13 +113,15 @@ def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str,
logger.error(f"Error general de descarga de archivo de control. {e}") logger.error(f"Error general de descarga de archivo de control. {e}")
#"**kwargs" dice que pueden haber más parámetros
def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str, def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str,
order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any], order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any],
provider: str, timezone: str, **kwargs): provider: str, timezone: str, procedure_filepath: str, **kwargs):
try: try:
extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"], extract_control(control_params["connection_id"], control_params["bucket"], control_params["prefix"],
kwargs['ti'], provider, timezone) kwargs['ti'], provider, timezone)
save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti']) # save_procedure_json(JSON_PROCEDURE_PATH, kwargs['ti'])
save_procedure_json(procedure_filepath, kwargs['ti'])
start_time = time.time() start_time = time.time()
logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}") logger.info(f"EXTRAYENDO SCRIPTS DESDE {bucket}/{prefix}")
scripts = get_files_from_prefix(conn_id, bucket, prefix, provider) scripts = get_files_from_prefix(conn_id, bucket, prefix, provider)
...@@ -155,6 +157,8 @@ def set_dag(): ...@@ -155,6 +157,8 @@ def set_dag():
wildcard_scripts, conf["cloud_provider"]) wildcard_scripts, conf["cloud_provider"])
control_s3 = conf["control"]["s3_params"] control_s3 = conf["control"]["s3_params"]
timezone = conf["timezone"] timezone = conf["timezone"]
# NUEVA VARIABLE
procedure = conf["procedure"]
# Scripts extraction # Scripts extraction
extract_mask = conf["source_mask"] extract_mask = conf["source_mask"]
transform_mask = conf["transformation_mask"] transform_mask = conf["transformation_mask"]
...@@ -167,7 +171,8 @@ def set_dag(): ...@@ -167,7 +171,8 @@ def set_dag():
'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask, 'prefix': scripts_s3["prefix"], 'source_mask': extract_mask, 'transform_mask': transform_mask,
'procedure_mask': procedure_mask, 'order_delimiter': order_delimiter, 'procedure_mask': procedure_mask, 'order_delimiter': order_delimiter,
'label_tablename': conf["label_multiple_select"], 'control_params': control_s3, 'label_tablename': conf["label_multiple_select"], 'control_params': control_s3,
'provider': conf["cloud_provider"], 'timezone': timezone}, 'provider': conf["cloud_provider"], 'timezone': timezone,
'procedure_filepath': procedure["filepath"]},
trigger_rule="all_success" trigger_rule="all_success"
) )
......
[
{
"identifier": "ESTUDIANTES",
"fields": [
{
"name": "ID",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "Nombre",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "Apellido",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "Edad",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "CorreoElectronico",
"datatype": "TEXT",
"maxLength": 100
}
],
"indexes": [
"ID"
],
"save_output" : true
}
]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment