Commit d85ea56f authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer' into 'developer-ca'

# Conflicts:
#   dags/components/Extractor.py
#   dags/components/Generation.py
#   dags/components/Model/InsumoModel.py
#   dags/components/Utils.py
#   dags/procedure_definition.json
parents 1bced539 7772e148
......@@ -56,10 +56,11 @@ class Database:
except Exception as e:
logger.error(f"Error creando db engine. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel") -> bool:
def create_model(self, tablename: str, fields: List[Tuple[str]], db_target: str,
modelName: str = "TableModel") -> bool:
create = False
try:
create = self.factory.create_model(tablename, fields, modelName)
create = self.factory.create_model(tablename, fields, db_target, modelName)
except Exception as e:
raise AssertionError(f"Error creando tabla dinámica con nombre {tablename}. {e}")
finally:
......
from typing import List, Tuple
import pymysql
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from components.Model.InsumoModel import InsumoModel
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.Model.InsumoModel import InsumoModel, InsumoModelOracle
from components.Databases.Enums.MysqlDataTypeEnum import MysqlDataTypeEnum
from components.Databases.Enums.MysqlDataTypeORMEnum import MysqlDataTypeORMEnum
......@@ -23,6 +25,17 @@ class Mysql:
self.password = password
self.database = database
self.engine = None
self.connection = None
def get_basic_connection(self):
try:
if isinstance(self.connection, type(None)):
self.connection = pymysql.connect(host=self.host, port=self.port, user=self.user,
password=self.password, db=self.database)
except Exception as e:
logger.error(f"Error obteniendo conexion básica de Oracle. {e}")
finally:
return self.connection
def create_engine(self) -> None:
try:
......@@ -32,13 +45,17 @@ class Mysql:
except Exception as e:
logger.error(f"Error creando engine de Mysql. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel") -> bool:
def create_model(self, tablename: str, fields: List[Tuple[str]], db_target: str, modelName: str = "TableModel"):
model = None
try:
model = type(modelName, (InsumoModel,), {
db_args = {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
}
if db_target == DatabaseTypeEnum.ORACLE.value:
model = type(modelName, (InsumoModelOracle,), db_args)
else:
model = type(modelName, (InsumoModel,), db_args)
for field in fields:
logger.debug(f"Attribute: {field}")
name = field[0]
......@@ -46,7 +63,11 @@ class Mysql:
size = int(field[2] / 4)
try:
if not isinstance(field[3], type(None)) and field[3] > 0:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value(precision=field[2], scale=field[3])
precision, scale = field[2], field[3]
if scale > precision:
precision = field[3]
scale = field[2]
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value(precision=precision, scale=scale)
else:
data_type = MysqlDataTypeORMEnum[MysqlDataTypeEnum(field[1]).name].value(size)
except TypeError:
......
......@@ -4,9 +4,10 @@ from sqlalchemy import create_engine
import oracledb
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.Databases.Enums.OracleDataTypeEnum import OracleDataTypeEnum
from components.Databases.Enums.OracleDataTypeORMEnum import OracleDataTypeORMEnum
from components.Model.InsumoModel import InsumoModel
from components.Model.InsumoModel import InsumoModel, InsumoModelOracle
from sqlalchemy import Column
import logging
......@@ -43,13 +44,17 @@ class Oracle:
except Exception as e:
logger.error(f"Error creando engine de Oracle. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel") -> bool:
def create_model(self, tablename: str, fields: List[Tuple[str]], db_target: str, modelName: str = "TableModel"):
model = None
try:
model = type(modelName, (InsumoModel,), {
db_args = {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
}
if db_target == DatabaseTypeEnum.ORACLE.value:
model = type(modelName, (InsumoModelOracle,), db_args)
else:
model = type(modelName, (InsumoModel,), db_args)
for field in fields:
logger.debug(f"Attribute: {field}")
name = field[0]
......
from typing import List, Tuple
import psycopg2
from sqlalchemy import create_engine
from components.Model.InsumoModel import InsumoModel
from components.Model.InsumoModel import InsumoModel, InsumoModelOracle
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.Databases.Enums.PostgresDataTypeEnum import PostgresDataTypeEnum
from components.Databases.Enums.PostgresDataTypeORMEnum import PostgresDataTypeORMEnum
......@@ -23,6 +25,18 @@ class Postgres:
self.schema = schema
self.engine = None
self.DEFAULT_VAR_LENGHT = 100
self.connection = None
def get_basic_connection(self):
try:
if isinstance(self.connection, type(None)):
self.connection = psycopg2.connect(host=self.host, port=self.port, user=self.user,
password=self.password, database=self.database,
options="-c search_path="+self.schema)
except Exception as e:
logger.error(f"Error obteniendo conexion básica de Oracle. {e}")
finally:
return self.connection
def create_engine(self) -> None:
try:
......@@ -33,20 +47,28 @@ class Postgres:
except Exception as e:
logger.error(f"Error creando engine de Postgres. {e}")
def create_model(self, tablename: str, fields: List[Tuple[str]], modelName: str = "TableModel"):
def create_model(self, tablename: str, fields: List[Tuple[str]], db_target: str, modelName: str = "TableModel"):
model = None
try:
model = type(modelName, (InsumoModel,), {
db_args = {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
}
if db_target == DatabaseTypeEnum.ORACLE.value:
model = type(modelName, (InsumoModelOracle,), db_args)
else:
model = type(modelName, (InsumoModel,), db_args)
for field in fields:
logger.debug(f"Attribute: {field}")
name = field[0]
if field[2] != -1:
try:
if not isinstance(field[3], type(None)) and field[3] > 0:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(precision=field[2], scale=field[3])
precision, scale = field[2], field[3]
if scale > precision:
precision = field[3]
scale = field[2]
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(precision=precision, scale=scale)
else:
data_type = PostgresDataTypeORMEnum[PostgresDataTypeEnum(field[1]).name].value(field[2])
except TypeError:
......
......@@ -119,13 +119,31 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa
if is_procedure:
command = command[len(tablename+"|"):]
temp_connection = source_conn.get_basic_connection()
cursor = temp_connection.cursor()
final_command = source_conn.generate_sql_procedure(command)
logger.debug(f"FINAL COMMAND: {final_command}")
cursor.execute(final_command)
for resultSet in cursor.getimplicitresults():
if source_conn.db_type == DatabaseTypeEnum.ORACLE.value:
cursor = temp_connection.cursor()
cursor.execute(command)
for resultSet in cursor.getimplicitresults():
data = []
for row in resultSet:
data.append(row)
if len(data) == chunksize:
dataframe = pd.DataFrame(data, columns=columns_name)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
if save:
logger.debug(f"Guardado correctamente dataframe. Procesando más bloques")
data.clear()
if len(data) > 0:
dataframe = pd.DataFrame(data, columns=columns_name)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
if save:
logger.debug(f"Migrado correctamente todos los datos")
data.clear()
elif source_conn.db_type == DatabaseTypeEnum.MYSQL.value or \
source_conn.db_type == DatabaseTypeEnum.POSTGRES.value:
cursor = temp_connection.cursor()
cursor.execute(command)
data = []
for row in resultSet:
for row in cursor._rows:
data.append(row)
if len(data) == chunksize:
dataframe = pd.DataFrame(data, columns=columns_name)
......@@ -150,8 +168,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa
logger.info(f"Número de pasos para migrar datos: {steps}")
for step in range(steps):
dataframe = next(iterator)
# dataframe["INTERN_ID_BCOM"] = np.NaN
logger.debug(dataframe)
dataframe = dataframe.fillna(value=np.nan)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
if save:
logger.info(f"Guardado correctamente dataframe en el paso {step+1}")
......
......@@ -95,8 +95,11 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
delimiter = params["delimiter"]
tmp_path = params["tmp_path"]
tmp_file = create_temp_file(tmp_path, filename_mask, file_type, tablename, timezone, pattern)
logger.info(tmp_file)
logger.info(f"Ruta creada: {tmp_file}")
logger.debug(f"TABLA: {tablename}")
steps = get_steps(tablename, chunksize, engine, True)
if intern_conn.db_type == DatabaseTypeEnum.ORACLE.value:
tablename = f"SELECT * FROM {tablename}"
iterator = get_iterator(tablename, chunksize, engine)
logger.info(f"Total de pasos para generar archivo resultado: {steps}")
for step in range(steps):
......
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, BIGINT
from sqlalchemy import Column, BigInteger
from sqlalchemy.schema import Identity
Base = declarative_base()
......@@ -8,4 +9,11 @@ class InsumoModel(Base):
__abstract__ = True
INTERN_ID_BCOM = Column(BIGINT, primary_key=True, autoincrement=True)
INTERN_ID_BCOM = Column(BigInteger, primary_key=True, autoincrement=True)
class InsumoModelOracle(Base):
__abstract__ = True
INTERN_ID_BCOM = Column(BigInteger, Identity(start=1), primary_key=True)
......@@ -22,7 +22,7 @@ def get_type_file(key: str) -> FileTypeEnum:
result = FileTypeEnum.EXCEL
try:
file_type_sufix = key.rfind(".")
file_type = key[file_type_sufix+1:]
file_type = key[file_type_sufix + 1:]
result = FileTypeEnum(file_type)
except Exception as e:
logger.error(f"Error obteniedo el tipo de archivo de {key}. {e}")
......@@ -81,10 +81,10 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
else:
catalog_prefix = default_prefix
s3_catalog = catalog_prefix + catalog["pattern"]
data_dict.update({'s3_'+catalog_name: s3_catalog, catalog_name+'_key': catalog["key_field"],
catalog_name+'_value': catalog["value_field"]})
data_dict.update({'s3_' + catalog_name: s3_catalog, catalog_name + '_key': catalog["key_field"],
catalog_name + '_value': catalog["value_field"]})
if "delimiter" in catalog.keys():
data_dict.update({catalog_name+'_delimiter': catalog["delimiter"]})
data_dict.update({catalog_name + '_delimiter': catalog["delimiter"]})
except Exception as e:
raise AssertionError(f"Error actualizando dict de catalogos. {e}")
finally:
......@@ -106,9 +106,9 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
if item.lower().strip() == "end":
final_data[-1] = final_data[-1] + "; end;"
final_item = item
if item.lower().strip().find(label_tablename.lower().strip()+":") != -1:
init_index = item.lower().strip().index(label_tablename.lower().strip()+":")
table_name = item.replace(" ", "").strip()[init_index+len(label_tablename):].strip()
if item.lower().strip().find(label_tablename.lower().strip() + ":") != -1:
init_index = item.lower().strip().index(label_tablename.lower().strip() + ":")
table_name = item.replace(" ", "").strip()[init_index + len(label_tablename + ":"):].strip()
add_next = True
elif item != "":
if add_next:
......@@ -139,8 +139,10 @@ def select_multiple(command: str) -> Dict[str, Any]:
response = {'is_multiple': False, 'tablename': ''}
tablename = ""
no_procedure_init = "|select"
procedure_init = ["|begin", "|call"]
try:
if command.lower().replace(" ", "").find(no_procedure_init) == -1:
if command.lower().replace(" ", "").find(procedure_init[0]) != -1 or \
command.lower().replace(" ", "").find(procedure_init[1]) != -1:
response["is_multiple"] = True
tablename = command[:command.index("|")].strip()
response["tablename"] = tablename
......@@ -152,7 +154,7 @@ def select_multiple(command: str) -> Dict[str, Any]:
if init_index == -1:
raise AssertionError("Query malformed")
else:
from_command = command[init_index+4:]
from_command = command[init_index + 4:]
tablename_base = from_command.strip().split(" ")
if len(tablename_base) > 0 and tablename == "":
tablename = tablename_base[0]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment