Commit 1f68dfd1 authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer' into 'developer-ev'

# Conflicts:
#   dags/dag_conf.yml
#   dags/dag_transformacion_bcom.py
parents e0b311e4 a537dd53
import pandas as pd import pandas as pd
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from sqlalchemy.types import VARCHAR
import logging import logging
...@@ -8,9 +10,28 @@ logger = logging.getLogger() ...@@ -8,9 +10,28 @@ logger = logging.getLogger()
def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool: def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
save = True save = True
try: try:
chunksize = 2000
# db_type = connection.db_type
connection = connection.engine
# print(df["CREACION_PRODUCTO"].value_counts())
with connection.connect() as conn: with connection.connect() as conn:
df.to_sql(tablename, conn, if_exists='append', index=False, chunksize=500) # if db_type == DatabaseTypeEnum.ORACLE.value:
# df.info()
# aux = df.columns[df.dtypes == 'object'].tolist()
# print(aux)
# dtyp = {}
# for col in aux:
# print(col)
# print(df[col].dtype)
# df[col] = df[col].astype(str)
# dtyp.update({col: VARCHAR(df[col].str.len().max())})
# # dtyp = {c: VARCHAR(df[c].str.len().max()) for c in aux}
# print(dtyp)
# df.to_sql(tablename, conn, if_exists='append', dtype=dtyp, index=False, chunksize=chunksize)
# else:
df.to_sql(tablename, conn, if_exists='append', index=False, chunksize=chunksize)
except Exception as e: except Exception as e:
logger.error(f"Error guardando resultados desde dataframe. {e}") logger.error(f"Error guardando resultados desde dataframe. {e}")
raise AssertionError(f"Error guardando resultados desde dataframe. {e}")
finally: finally:
return save return save
...@@ -101,7 +101,8 @@ class Oracle: ...@@ -101,7 +101,8 @@ class Oracle:
def get_all_tablenames(self) -> List[str]: def get_all_tablenames(self) -> List[str]:
tablenames = [] tablenames = []
try: try:
command = f"SELECT table_name FROM all_tables WHERE OWNER='{self.user}'" user = self.user.upper()
command = f"SELECT table_name FROM all_tables WHERE OWNER='{user}'"
with self.engine.connect() as conn: with self.engine.connect() as conn:
tablenames = conn.execute(command).all() tablenames = conn.execute(command).all()
except Exception as e: except Exception as e:
......
from typing import Any, Dict from typing import Any, Dict
import json import json
import time
import numpy as np import numpy as np
import pandas as pd import pandas as pd
...@@ -169,12 +170,14 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa ...@@ -169,12 +170,14 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa
# Traemos el iterator # Traemos el iterator
iterator = get_iterator(command, chunksize, source_engine) iterator = get_iterator(command, chunksize, source_engine)
logger.info(f"Número de pasos para migrar datos: {steps}") logger.info(f"Número de pasos para migrar datos: {steps}")
start_time = time.time()
for step in range(steps): for step in range(steps):
dataframe = next(iterator) dataframe = next(iterator)
dataframe = dataframe.fillna(value=np.nan) dataframe = dataframe.fillna(value=np.nan)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine) save = save_from_dataframe(dataframe, tablename, intern_conn)
if save: if save:
logger.info(f"Guardado correctamente dataframe en el paso {step+1}") logger.info(f"Guardado correctamente dataframe en el paso {step+1}")
logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos")
except Exception as e: except Exception as e:
delete = delete_table(tablename, intern_conn.engine) delete = delete_table(tablename, intern_conn.engine)
if delete: if delete:
......
...@@ -23,10 +23,10 @@ app: ...@@ -23,10 +23,10 @@ app:
service: service:
schema: intern_db schema: intern_db
chunksize: 4000 chunksize: 4000
label_multiple_select: TABLE label_multiple_select: TABLENAME
source_mask: selectDA # Sufijo (S) source_mask: select # Sufijo (S)
procedure_mask: procedureDA # S procedure_mask: procedure # S
transformation_mask: transformDA # S transformation_mask: transform # S
prefix_order_delimiter: . prefix_order_delimiter: .
cloud_provider: aws cloud_provider: aws
scripts: scripts:
......
...@@ -20,7 +20,7 @@ logger = logging.getLogger() ...@@ -20,7 +20,7 @@ logger = logging.getLogger()
DAG_NAME = "INFORM_PROCESS" DAG_NAME = "INFORM_PROCESS"
# Change this path if is deployed in prod or dev # Change this path if is deployed in prod or dev
MAIN_PATH = "/opt/airflow/dags/" MAIN_PATH = "/root/airflow/dags/"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'BCOM', 'owner': 'BCOM',
......
...@@ -234,7 +234,7 @@ def set_dag_1(): ...@@ -234,7 +234,7 @@ def set_dag_1():
from yaml.loader import SafeLoader from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml # Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags # En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/root/airflow/dags/app_conf.yml" conf_path = "/opt/airflow/dags/app_conf.yml"
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader) data = yaml.load(f, Loader=SafeLoader)
general_cnf = data["general"] general_cnf = data["general"]
......
...@@ -29,8 +29,8 @@ ...@@ -29,8 +29,8 @@
}, },
{ {
"name": "NU_ADDON", "name": "NU_ADDON",
"datatype": "NUMBER", "datatype": "TEXT",
"decimal_precision": 0 "maxLength": 5
}, },
{ {
"name": "NB_PAQUETE", "name": "NB_PAQUETE",
...@@ -53,11 +53,11 @@ ...@@ -53,11 +53,11 @@
}, },
{ {
"name": "FH_ACTIVACION", "name": "FH_ACTIVACION",
"datatype": "DATE" "datatype": "DATETIME"
}, },
{ {
"name": "FH_OPERACION", "name": "FH_OPERACION",
"datatype": "DATE" "datatype": "DATETIME"
}, },
{ {
"name": "TP_SERVICIO", "name": "TP_SERVICIO",
...@@ -81,7 +81,7 @@ ...@@ -81,7 +81,7 @@
}, },
{ {
"name": "FH_CARGA", "name": "FH_CARGA",
"datatype": "DATE" "datatype": "DATETIME"
}, },
{ {
"name": "NU_ANIO", "name": "NU_ANIO",
...@@ -141,7 +141,8 @@ ...@@ -141,7 +141,8 @@
], ],
"indexes": [ "indexes": [
"CD_PAQUETE", "NU_ADDON", "CD_CLIENTE" "CD_PAQUETE", "NU_ADDON", "CD_CLIENTE"
] ],
"save_output": true
}, },
{ {
"identifier": "PROMOCIONES_RESIDENCIAL", "identifier": "PROMOCIONES_RESIDENCIAL",
...@@ -240,7 +241,7 @@ ...@@ -240,7 +241,7 @@
"indexes": ["CD_PAQUETE"] "indexes": ["CD_PAQUETE"]
}, },
{ {
"identifier": "CATALOGO_PROMOCIONES", "identifier": "CATALOGO_PROMOCION",
"fields": [ "fields": [
{ {
"name": "NOMBRE_PRODUCTO", "name": "NOMBRE_PRODUCTO",
...@@ -255,7 +256,7 @@ ...@@ -255,7 +256,7 @@
] ]
}, },
{ {
"identifier": "RELACION_PROMOCION_3PA2P", "identifier": "TEMP_PROMO",
"fields": [ "fields": [
{ {
"name": "TRESP", "name": "TRESP",
...@@ -270,7 +271,7 @@ ...@@ -270,7 +271,7 @@
] ]
}, },
{ {
"identifier": "RELACION_POIDPAQUETE", "identifier": "RELACION_POID_PAQ",
"fields": [ "fields": [
{ {
"name": "POID_PRODUCT", "name": "POID_PRODUCT",
...@@ -300,38 +301,13 @@ ...@@ -300,38 +301,13 @@
] ]
}, },
{ {
"identifier": "PAQUETES_NOPROMOCION", "identifier": "ADDONS_UNICO",
"fields": [
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "PROCEDURE_1",
"fields": [ "fields": [
{
"name": "CD_FOLIO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_CUENTA",
"datatype": "TEXT",
"maxLength": 100
},
{ {
"name": "CD_PAQUETE", "name": "CD_PAQUETE",
"datatype": "TEXT", "datatype": "TEXT",
"maxLength": 50 "maxLength": 50
},
{
"name": "NB_PAQUETE",
"datatype": "TEXT",
"maxLength": 200
} }
] ]
} }
] ]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment