Commit a537dd53 authored by Cristian Aguirre's avatar Cristian Aguirre

Update 07-08-23. Update Extractor.py

parent 48029b5d
import pandas as pd import pandas as pd
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from sqlalchemy.types import VARCHAR
import logging import logging
...@@ -8,9 +10,28 @@ logger = logging.getLogger() ...@@ -8,9 +10,28 @@ logger = logging.getLogger()
def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool: def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
save = True save = True
try: try:
chunksize = 2000
# db_type = connection.db_type
connection = connection.engine
# print(df["CREACION_PRODUCTO"].value_counts())
with connection.connect() as conn: with connection.connect() as conn:
df.to_sql(tablename, conn, if_exists='append', index=False, chunksize=500) # if db_type == DatabaseTypeEnum.ORACLE.value:
# df.info()
# aux = df.columns[df.dtypes == 'object'].tolist()
# print(aux)
# dtyp = {}
# for col in aux:
# print(col)
# print(df[col].dtype)
# df[col] = df[col].astype(str)
# dtyp.update({col: VARCHAR(df[col].str.len().max())})
# # dtyp = {c: VARCHAR(df[c].str.len().max()) for c in aux}
# print(dtyp)
# df.to_sql(tablename, conn, if_exists='append', dtype=dtyp, index=False, chunksize=chunksize)
# else:
df.to_sql(tablename, conn, if_exists='append', index=False, chunksize=chunksize)
except Exception as e: except Exception as e:
logger.error(f"Error guardando resultados desde dataframe. {e}") logger.error(f"Error guardando resultados desde dataframe. {e}")
raise AssertionError(f"Error guardando resultados desde dataframe. {e}")
finally: finally:
return save return save
...@@ -101,7 +101,8 @@ class Oracle: ...@@ -101,7 +101,8 @@ class Oracle:
def get_all_tablenames(self) -> List[str]: def get_all_tablenames(self) -> List[str]:
tablenames = [] tablenames = []
try: try:
command = f"SELECT table_name FROM all_tables WHERE OWNER='{self.user}'" user = self.user.upper()
command = f"SELECT table_name FROM all_tables WHERE OWNER='{user}'"
with self.engine.connect() as conn: with self.engine.connect() as conn:
tablenames = conn.execute(command).all() tablenames = conn.execute(command).all()
except Exception as e: except Exception as e:
......
from typing import Any, Dict from typing import Any, Dict
import json import json
import time
import numpy as np import numpy as np
import pandas as pd import pandas as pd
...@@ -169,12 +170,14 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa ...@@ -169,12 +170,14 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa
# Traemos el iterator # Traemos el iterator
iterator = get_iterator(command, chunksize, source_engine) iterator = get_iterator(command, chunksize, source_engine)
logger.info(f"Número de pasos para migrar datos: {steps}") logger.info(f"Número de pasos para migrar datos: {steps}")
start_time = time.time()
for step in range(steps): for step in range(steps):
dataframe = next(iterator) dataframe = next(iterator)
dataframe = dataframe.fillna(value=np.nan) dataframe = dataframe.fillna(value=np.nan)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine) save = save_from_dataframe(dataframe, tablename, intern_conn)
if save: if save:
logger.info(f"Guardado correctamente dataframe en el paso {step+1}") logger.info(f"Guardado correctamente dataframe en el paso {step+1}")
logger.info(f"Tiempo del Task de descarga de scripts: {round(time.time() - start_time, 3)} segundos")
except Exception as e: except Exception as e:
delete = delete_table(tablename, intern_conn.engine) delete = delete_table(tablename, intern_conn.engine)
if delete: if delete:
......
...@@ -6,27 +6,27 @@ app: ...@@ -6,27 +6,27 @@ app:
sources: sources:
source1: source1:
type: mysql type: mysql
host: database-11.cluster-ro-cvsz4ey9eiec.us-east-1.rds.amazonaws.com host: 192.168.21.52
port: 3306 port: 13306
username: admin username: root
password: adminadmin password: root
database: prueba_ca_1 database: bcom_tp_res
service: ORCLPDB1 service: ORCLPDB1
schema: sources schema: sources
transformation: transformation:
type: mysql type: oracle
host: 192.168.1.2 host: 192.168.27.22
port: 13306 port: 21521
username: root username: RLQA_AIR
password: root password: RLQA_AIR99
database: prueba_bcom2 database:
service: service: ORCLPDB1
schema: intern_db schema:
chunksize: 4000 chunksize: 4000
label_multiple_select: TABLE label_multiple_select: TABLENAME
source_mask: selectDA # Sufijo (S) source_mask: select # Sufijo (S)
procedure_mask: procedureDA # S procedure_mask: procedure # S
transformation_mask: transformDA # S transformation_mask: transform # S
prefix_order_delimiter: . prefix_order_delimiter: .
cloud_provider: aws cloud_provider: aws
scripts: scripts:
......
...@@ -20,7 +20,7 @@ logger = logging.getLogger() ...@@ -20,7 +20,7 @@ logger = logging.getLogger()
DAG_NAME = "INFORM_PROCESS" DAG_NAME = "INFORM_PROCESS"
# Change this path if is deployed in prod or dev # Change this path if is deployed in prod or dev
MAIN_PATH = "/opt/airflow/dags/" MAIN_PATH = "/root/airflow/dags/"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'BCOM', 'owner': 'BCOM',
......
...@@ -26,7 +26,7 @@ DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM" ...@@ -26,7 +26,7 @@ DAG_NAME = "BCOM_DAG_EXTRACT_AND_TRANSFORM"
# Change this path if is deployed in prod or dev # Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/" MAIN_PATH = "/root/airflow/dags/"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition2.json" JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition.json"
DEFAULT_ARGS = { DEFAULT_ARGS = {
'owner': 'BCOM', 'owner': 'BCOM',
......
...@@ -234,7 +234,7 @@ def set_dag_1(): ...@@ -234,7 +234,7 @@ def set_dag_1():
from yaml.loader import SafeLoader from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml # Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags # En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/root/airflow/dags/app_conf.yml" conf_path = "/opt/airflow/dags/app_conf.yml"
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader) data = yaml.load(f, Loader=SafeLoader)
general_cnf = data["general"] general_cnf = data["general"]
......
...@@ -29,8 +29,8 @@ ...@@ -29,8 +29,8 @@
}, },
{ {
"name": "NU_ADDON", "name": "NU_ADDON",
"datatype": "NUMBER", "datatype": "TEXT",
"decimal_precision": 0 "maxLength": 5
}, },
{ {
"name": "NB_PAQUETE", "name": "NB_PAQUETE",
...@@ -53,11 +53,11 @@ ...@@ -53,11 +53,11 @@
}, },
{ {
"name": "FH_ACTIVACION", "name": "FH_ACTIVACION",
"datatype": "DATE" "datatype": "DATETIME"
}, },
{ {
"name": "FH_OPERACION", "name": "FH_OPERACION",
"datatype": "DATE" "datatype": "DATETIME"
}, },
{ {
"name": "TP_SERVICIO", "name": "TP_SERVICIO",
...@@ -81,7 +81,7 @@ ...@@ -81,7 +81,7 @@
}, },
{ {
"name": "FH_CARGA", "name": "FH_CARGA",
"datatype": "DATE" "datatype": "DATETIME"
}, },
{ {
"name": "NU_ANIO", "name": "NU_ANIO",
...@@ -141,7 +141,8 @@ ...@@ -141,7 +141,8 @@
], ],
"indexes": [ "indexes": [
"CD_PAQUETE", "NU_ADDON", "CD_CLIENTE" "CD_PAQUETE", "NU_ADDON", "CD_CLIENTE"
] ],
"save_output": true
}, },
{ {
"identifier": "PROMOCIONES_RESIDENCIAL", "identifier": "PROMOCIONES_RESIDENCIAL",
...@@ -240,7 +241,7 @@ ...@@ -240,7 +241,7 @@
"indexes": ["CD_PAQUETE"] "indexes": ["CD_PAQUETE"]
}, },
{ {
"identifier": "CATALOGO_PROMOCIONES", "identifier": "CATALOGO_PROMOCION",
"fields": [ "fields": [
{ {
"name": "NOMBRE_PRODUCTO", "name": "NOMBRE_PRODUCTO",
...@@ -255,7 +256,7 @@ ...@@ -255,7 +256,7 @@
] ]
}, },
{ {
"identifier": "RELACION_PROMOCION_3PA2P", "identifier": "TEMP_PROMO",
"fields": [ "fields": [
{ {
"name": "TRESP", "name": "TRESP",
...@@ -270,7 +271,7 @@ ...@@ -270,7 +271,7 @@
] ]
}, },
{ {
"identifier": "RELACION_POIDPAQUETE", "identifier": "RELACION_POID_PAQ",
"fields": [ "fields": [
{ {
"name": "POID_PRODUCT", "name": "POID_PRODUCT",
...@@ -300,38 +301,13 @@ ...@@ -300,38 +301,13 @@
] ]
}, },
{ {
"identifier": "PAQUETES_NOPROMOCION", "identifier": "ADDONS_UNICO",
"fields": [
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "PROCEDURE_1",
"fields": [ "fields": [
{
"name": "CD_FOLIO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_CUENTA",
"datatype": "TEXT",
"maxLength": 100
},
{ {
"name": "CD_PAQUETE", "name": "CD_PAQUETE",
"datatype": "TEXT", "datatype": "TEXT",
"maxLength": 50 "maxLength": 50
},
{
"name": "NB_PAQUETE",
"datatype": "TEXT",
"maxLength": 200
} }
] ]
} }
] ]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment