Commit 0b05fe09 authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer-ca' into 'developer'

Update 07-08-23. Update Extract Control

See merge request !7
parents 70ea4a93 208af456
...@@ -25,8 +25,7 @@ logger = logging.getLogger() ...@@ -25,8 +25,7 @@ logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM" DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM"
# Change this path if is deployed in prod or dev # Change this path if is deployed in prod or dev
MAIN_PATH = "/opt/airflow/dags/" MAIN_PATH = "/root/airflow/dags/"
#JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition.json"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'BCOM', 'owner': 'BCOM',
...@@ -83,7 +82,7 @@ def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, An ...@@ -83,7 +82,7 @@ def extraction(source_conn, intern_conn, timezone: str, control_s3: Dict[str, An
def save_procedure_json(json_path: str, task) -> None: def save_procedure_json(json_path: str, task) -> None:
try: try:
with open("/opt/airflow/dags/"+ json_path) as f: with open(json_path) as f:
data = json.load(f) data = json.load(f)
logger.info(f"JSON-PROCEDURE {data}") logger.info(f"JSON-PROCEDURE {data}")
if data: if data:
...@@ -98,22 +97,19 @@ def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str, ...@@ -98,22 +97,19 @@ def extract_control(conn_id: str, bucket: str, prefix: str, task, provider: str,
prefix += "/" prefix += "/"
logger.info(f"BUSCANDO Y EXTRAYENDO ARCHIVO DE CONTROL DESDE {prefix}") logger.info(f"BUSCANDO Y EXTRAYENDO ARCHIVO DE CONTROL DESDE {prefix}")
control, _ = get_file_from_prefix(conn_id, bucket, prefix, provider, timezone, task) control, _ = get_file_from_prefix(conn_id, bucket, prefix, provider, timezone, task)
if control: logger.debug(f"Archivo de control descargado: {control}")
try:
str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore') str_data = str(control.getvalue(), encoding='UTF-8', errors='ignore')
data = StringIO(str_data) data = StringIO(str_data)
if not data: data = json.load(data)
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}] except Exception:
else:
data = json.load(data)
else:
logger.info(f"Json de control creado: {control}")
data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}] data = [{"status": ProcessStatusEnum.SUCCESS.value, "tasks": {}}]
logger.info(f"Json de control creado: {data}")
task.xcom_push(key="CONTROL-CONFIG", value=data) task.xcom_push(key="CONTROL-CONFIG", value=data)
except Exception as e: except Exception as e:
logger.error(f"Error general de descarga de archivo de control. {e}") logger.error(f"Error general de descarga de archivo de control. {e}")
#"**kwargs" dice que pueden haber más parámetros
def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str, def extract_scripts(conn_id: str, bucket: str, prefix: str, source_mask: str, transform_mask: str,
order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any], order_delimiter: str, procedure_mask: str, label_tablename: str, control_params: Dict[str, Any],
provider: str, timezone: str, procedure_filepath: str, **kwargs): provider: str, timezone: str, procedure_filepath: str, **kwargs):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment