Commit 56c7f2a2 authored by Cristian Aguirre's avatar Cristian Aguirre

Update 25-07-23. Update DAG process. Version-2-inprogress

parent 55785c2d
......@@ -30,6 +30,7 @@ class Database:
elif db_type == DatabaseTypeEnum.ORACLE.value:
self.factory = Oracle(host, port, user, password, service)
self.engine = None
self.procedure_identifier = "execute"
def get_basic_connection(self):
connection = None
......@@ -73,3 +74,13 @@ class Database:
logger.error(f"Error creando tabla {model}. {e}")
finally:
return save
def generate_sql_procedure(self, command: str) -> str:
response = ""
try:
command = command.lower()
response = self.factory.generate_sql_procedure(command, self.procedure_identifier)
except Exception as e:
logger.error(f"Error generando comando sql para procedure. Comando: {command}. {e}")
finally:
return response
......@@ -59,3 +59,13 @@ class Mysql:
finally:
return model
def generate_sql_procedure(self, command: str, reserved_word: str = "execute") -> str:
response = ""
try:
command = command.replace(reserved_word, "").replace(";", "")
response = f"call {command};"
logger.debug("COMANDO MYSQL:", response)
except Exception as e:
logger.error(f"Error generando comando sql para procedure Mysql. Comando: {command}. {e}")
finally:
return response
......@@ -81,3 +81,14 @@ class Oracle:
logger.error(f"Error creando modelo dinámico en Oracle con nombre {tablename}. {type(e)}.{e}")
finally:
return model
def generate_sql_procedure(self, command: str, reserved_word: str = "execute") -> str:
response = ""
try:
command = command.replace(reserved_word, "").replace(";", "")
response = f"begin {command}; end;"
logger.debug("COMANDO ORACLE:", response)
except Exception as e:
logger.error(f"Error generando comando sql para procedure Oracle. Comando: {command}. {e}")
finally:
return response
......@@ -59,3 +59,14 @@ class Postgres:
logger.error(f"Error creando modelo dinámico en Postgres con nombre {tablename}. {type(e)}. {e}")
finally:
return model
def generate_sql_procedure(self, command: str, reserved_word: str = "execute") -> str:
response = ""
try:
command = command.replace(reserved_word, "").replace(";", "")
response = f"call {command};"
logger.debug("COMANDO POSTGRES:", response)
except Exception as e:
logger.error(f"Error generando comando sql para procedure Postgres. Comando: {command}. {e}")
finally:
return response
......@@ -84,65 +84,43 @@ def on_success_extractor(context) -> None:
def extract_from_source(command: str, source_conn, intern_conn, chunksize: int, **kwargs):
extract_type = command[0]
task = kwargs['ti']
extract_type = command[0].split("|")[0]
command = command[1]
source_engine = source_conn.engine
command_for_create = command
is_tablename = False
definitions = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="EXTRACTION-DEFINITION-JSON")
logger.info(f"Definiciones desde json: {definitions}")
multiple = select_multiple(command)
tablename = multiple["tablename"]
is_procedure = False
if extract_type == "PROCEDURE":
is_procedure = True
columns_name = []
if extract_type.startswith(OperationTypeEnum.PROCEDURE.value):
# Create the model with the procedure descriptor
if command.replace(" ", "").lower().find("|begin") != -1:
tablename = command[:command.find("|")]
else:
proc_name = command[len("begin"):command.rfind("end")]
tablename = proc_name.strip().replace(";", "")
task = kwargs['ti']
procedures = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="PROCEDURE-JSON")
model = None
for procedure in procedures:
if procedure["procedure_identifier"] != tablename:
continue
model = generateModel(tablename, procedure["fields"])
columns_name = [field["identifier"] for field in procedure["fields"]]
is_tablename = True
if isinstance(model, type(None)):
raise AssertionError("Descripción del procedure en el json descriptor no encontraddo")
else:
if source_conn.db_type == DatabaseTypeEnum.ORACLE.value and \
not extract_type.startswith(OperationTypeEnum.PROCEDURE.value):
command_for_create = f"SELECT * FROM ({command_for_create}) OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY"
else:
command_words = command_for_create.split(" ")
if command_words[-2].lower() != "limit":
command_for_create += " limit 1"
columns = []
with source_engine.connect() as connection:
final_command = command_for_create
if final_command.replace(" ", "").lower().find("|select") != -1:
final_command = final_command[final_command.find("select"):]
result = connection.execute(final_command)
fields = result.cursor.description
for field in fields:
name, type_code, length, presicion = field[0], field[1], field[3], field[5]
columns.append((name, type_code, length, presicion))
logger.debug(f"Columnas procesadas: {columns}")
multiple = select_multiple(command)
tablename = multiple["tablename"]
model = source_conn.create_model(tablename, columns)
# Create the model with the procedure descriptor
model = None
for procedure in definitions:
if procedure["identifier"] != tablename:
continue
indexes = []
if "indexes" in procedure.keys():
indexes = procedure["indexes"]
model = generateModel(tablename, procedure["fields"], indexes)
columns_name = [field["name"] for field in procedure["fields"]]
if isinstance(model, type(None)):
raise AssertionError(f"Definición del extracción para {tablename} en el json-descriptor no encontraddo")
try:
create = intern_conn.create_table(model)
if create:
logger.info(f"Creado correctamente la tabla {tablename}. Creado?: {create}")
else:
raise AssertionError(f"Error creando tabla {tablename}")
if is_tablename:
command = command[len(tablename+":"):]
if is_procedure:
command = command[len(tablename+"|"):]
temp_connection = source_conn.get_basic_connection()
cursor = temp_connection.cursor()
cursor.execute(command)
final_command = source_conn.generate_sql_procedure(command)
logger.debug(f"FINAL COMMAND: {final_command}")
cursor.execute(final_command)
for resultSet in cursor.getimplicitresults():
data = []
for row in resultSet:
......@@ -201,7 +179,7 @@ def get_select_from_xcom(**kwargs):
for select in xcom_selects:
tablename = select_multiple(select)["tablename"]
if tasks["status"] == ProcessStatusEnum.SUCCESS.value or tablename not in success_tasks:
final_selects.append((key, select))
final_selects.append((key, select, ))
logger.info(f"Comandos para la extracción: {final_selects}")
Variable.set(key='SELECTS', value=final_selects, serialize_json=True)
return [[item] for item in final_selects]
......
......@@ -82,7 +82,7 @@ def transformations(xcom_commands: str, intern_conn):
logger.info(f"Ejecutando transformaciones del script {script_name}")
with engine.connect() as connection:
for command in commands:
logger.debug(f"Ejecutando comando de transformación: {command}")
logger.info(f"Ejecutando comando de transformación: {command}")
_ = connection.execute(command)
......
......@@ -93,6 +93,7 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) -> List[Tuple[str, List[str]]]:
result = []
allowed_commands = ["create", "update", "delete", "select", "alter", "drop", "begin", "commit"]
comments = [CommentsScriptEnum[item].value for item in CommentsScriptEnum._member_names_]
try:
for row in dataset:
......@@ -109,7 +110,7 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
final_item = item
if item.lower().strip().find(label_tablename.lower().strip()+":") != -1:
init_index = item.lower().strip().index(label_tablename.lower().strip()+":")
table_name = item.replace(" ", "").strip()[init_index+5:].strip()
table_name = item.replace(" ", "").strip()[init_index+len(label_tablename):].strip()
add_next = True
elif item != "":
if add_next:
......@@ -117,38 +118,21 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
add_next = False
final_item = item.strip()
table_name = ""
if final_item.strip()[:2] in comments and ("update " in final_item.lower() or "delete " in final_item.lower() or
"alter" in final_item.lower() or "create" in final_item.lower() or
"drop" in final_item.lower()):
trans_index = final_item.lower().find("update")
if trans_index != -1:
final_item = final_item[trans_index:]
delete_index = final_item.lower().find("delete")
if delete_index != -1:
final_item = final_item[delete_index:]
alter_index = final_item.lower().find("alter")
if alter_index != -1:
final_item = final_item[alter_index:]
create_index = final_item.lower().find("create")
if create_index != -1:
final_item = final_item[create_index:]
drop_index = final_item.lower().find("drop")
if drop_index != -1:
final_item = final_item[drop_index:]
if any([command in final_item.lower() for command in allowed_commands]):
if final_item.strip()[:2] in comments:
init_indexes = [final_item.lower().find(command) for command in allowed_commands
if final_item.lower().find(command) != -1]
if len(init_indexes) > 0:
init_index = init_indexes[0]
final_item = final_item[init_index:]
final_item = final_item.replace("%", "%%")
final_data.append(final_item)
final_data = [item.replace("\t", "") for item in final_data if item != "" and ("select" in item.lower() or
"update" in item.lower() or
"delete" in item.lower() or
"begin" in item.lower() or
"alter" in item.lower() or
"create" in item.lower() or
"drop" in item.lower() or
"commit" in item.lower())]
final_data = [item.replace("\t", "") for item in final_data if item != "" and
all([not item.strip().startswith(comment) for comment in comments])]
result.append((row[0], final_data))
logger.info(f"Lista de comandos: {result}")
except Exception as e:
raise AssertionError(f"Error extrayendo comandos sql. {e}")
logger.error(f"Error extrayendo comandos sql. {e}")
finally:
return result
......@@ -157,9 +141,8 @@ def select_multiple(command: str) -> Dict[str, Any]:
response = {'is_multiple': False, 'tablename': ''}
tablename = ""
no_procedure_init = "|select"
procedure_init = ["|begin"]
try:
if command.lower().replace(" ", "").find(procedure_init[0]) != -1:
if command.lower().replace(" ", "").find(no_procedure_init) == -1:
response["is_multiple"] = True
tablename = command[:command.index("|")].strip()
response["tablename"] = tablename
......@@ -217,21 +200,30 @@ def delete_temp_dir(module_name: str) -> bool:
return drop
def generateModel(tablename: str, attributes: List[Dict[str, Any]], modelName: str = "TableModel"):
def generateModel(tablename: str, attributes: List[Dict[str, Any]], indexes: List[str], modelName: str = "TableModel"):
default_precision = 8
model = type(modelName, (InsumoModel,), {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
try:
for attribute in attributes:
index = False
if attribute["name"] in indexes:
index = True
logger.debug(f"attribute: {attribute}")
if attribute["datatype"] == DataTypeEnum.TEXT.name and "maxLength" in attribute.keys():
setattr(model, attribute["identifier"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(attribute["maxLength"])))
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(attribute["maxLength"]), index=index))
elif attribute["datatype"] == DataTypeEnum.DECIMAL.name:
precision = default_precision
if "decimal_precision" in attribute.keys():
precision = attribute["decimal_precision"]
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value(38, precision), index=index))
else:
setattr(model, attribute["identifier"],
Column(DataTypeOrmEnum[attribute["datatype"]].value))
setattr(model, attribute["name"],
Column(DataTypeOrmEnum[attribute["datatype"]].value, index=index))
model = model.__table__
except InvalidRequestError as e:
logger.debug(f"InvalidRequestError. {e}")
......
......@@ -3,11 +3,11 @@ app:
database:
sources:
source1:
type: mysql
host: 192.168.21.52
port: 13306
username: root
password: root
type: oracle
host: 192.168.27.22
port: 21521
username: PRUEBABCOM2
password: admin
database: bd_tp_qa
service: ORCLPDB1
schema: public
......@@ -21,7 +21,7 @@ app:
service:
schema:
chunksize: 8000
label_multiple_select: TABLE
label_multiple_select: TABLENAME
source_mask: select # Sufijo (S)
procedure_mask: procedure # S
transformation_mask: transform # S
......
......@@ -7,7 +7,7 @@ from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from components.Utils import update_sql_commands
from components.Xcom import save_commands_to_xcom, delete_task_instances
from components.Xcom import save_commands_to_xcom
from components.S3Route import get_files_from_prefix, get_file_from_key
from components.Sensor import create_s3_sensor
from components.Extractor import get_extract_task_group
......@@ -25,7 +25,7 @@ logger = logging.getLogger()
DAG_NAME = "BCOM_DAG_TRANSFORMACIONES3"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
MAIN_PATH = "/opt/airflow/dags/"
JSON_PROCEDURE_PATH = MAIN_PATH + "procedure_definition.json"
DEFAULT_ARGS = {
......@@ -87,7 +87,7 @@ def save_procedure_json(json_path: str, task) -> None:
data = json.load(f)
logger.info(f"JSON-PROCEDURE {data}")
if data:
task.xcom_push(key="PROCEDURE-JSON", value=data)
task.xcom_push(key="EXTRACTION-DEFINITION-JSON", value=data)
except Exception as e:
logger.error(f"Error leyendo y guardando archivo descriptor de procedure. {e}")
......
......@@ -234,7 +234,7 @@ def set_dag_1():
from yaml.loader import SafeLoader
# Cambiar conf_path dependiendo del ambiente, en prod usando k8 y contenedores usar /opt/airflow/dags/app_conf.yml
# En desarrollo, cualquiera que apunte a su carpeta dags
conf_path = "/root/airflow/dags/app_conf.yml"
conf_path = "/opt/airflow/dags/app_conf.yml"
with open(conf_path) as f:
data = yaml.load(f, Loader=SafeLoader)
general_cnf = data["general"]
......
......@@ -8,5 +8,7 @@ class DataTypeEnum(enum.Enum):
NUMBER = int
TEXT = str
DECIMAL = float
BOOLEAN = bool
DATETIME = Timestamp
DATE = pd.Timestamp
LONGTEXT = str
import enum
from sqlalchemy import Integer, String, Date, DateTime, DECIMAL
from sqlalchemy import Integer, String, Date, DateTime, DECIMAL, BOOLEAN, TEXT
class DataTypeOrmEnum(enum.Enum):
......@@ -8,4 +8,5 @@ class DataTypeOrmEnum(enum.Enum):
DECIMAL = DECIMAL
DATE = Date
DATETIME = DateTime
BOOLEAN = BOOLEAN
LONGTEXT = TEXT
[{
"procedure_identifier": "PROCEDURE_1",
[
{
"identifier": "TACOMVENTAS",
"fields": [
{
"name": "CD_EMPRESA",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "CD_FOLIO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_CUENTA",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "NU_VENDEDOR",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "NU_ADDON",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NB_PAQUETE",
"datatype": "TEXT",
"maxLength": 200
},
{
"name": "CD_CLIENTE",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "NB_CLIENTE",
"datatype": "TEXT",
"maxLength": 200
},
{
"name": "FH_CONTRATACION",
"datatype": "DATE"
},
{
"name": "FH_ACTIVACION",
"datatype": "DATE"
},
{
"name": "FH_OPERACION",
"datatype": "DATE"
},
{
"name": "TP_SERVICIO",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "ST_CLIENTE",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "TP_PAGO",
"datatype": "TEXT",
"maxLength": 10
},
{
"name": "NB_USUACARGA",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "FH_CARGA",
"datatype": "DATE"
},
{
"name": "NU_ANIO",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NU_MES",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NU_SEMANA",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NU_COMISION",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "TP_PAGOANT",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "REGLA_APLICADA",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "AUX",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "PROMOCION",
"datatype": "TEXT",
"maxLength": 80
},
{
"name": "EMPLEADOEMBAJADOR__C",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CANAL_DE_VENTA__C",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "CANALREFERIDO__C",
"datatype": "TEXT",
"maxLength": 50
}
],
"indexes": [
"CD_PAQUETE", "NU_ADDON", "CD_CLIENTE"
]
},
{
"identifier": "PROMOCIONES_RESIDENCIAL",
"fields": [
{
"identifier": "CD_EMPRESA",
"name": "EMPRESA",
"datatype": "TEXT",
"maxLength": 250
},
{
"name": "CUENTA",
"datatype": "TEXT",
"maxLength": 250
},
{
"name": "PLAN",
"datatype": "TEXT",
"maxLength": 250
},
{
"name": "NOMBRE_PRODUCTO",
"datatype": "TEXT",
"maxLength": 250
},
{
"name": "DESCR",
"datatype": "TEXT",
"maxLength": 250
},
{
"name": "TIPO_ADICION",
"datatype": "TEXT",
"maxLength": 250
},
{
"name": "RENTACONIMPUESTOS",
"datatype": "NUMBER"
},
{
"name": "RENTASINIMPUESTOS",
"datatype": "NUMBER"
},
{
"name": "QUANTITY",
"datatype": "NUMBER",
"decimal_precision": 0,
"maxLength": null
"decimal_precision": 0
},
{
"name": "CREACION_PRODUCTO",
"datatype": "DATE"
},
{
"name": "INICIO_COBRO",
"datatype": "DATE"
},
{
"identifier": "CD_FOLIO",
"name": "FIN_COBRO",
"datatype": "DATE"
},
{
"name": "FIN_COMPRA",
"datatype": "DATE"
},
{
"name": "SERV_STATUS",
"datatype": "TEXT",
"maxLength": 20
},
{
"name": "POID_TYPE",
"datatype": "TEXT",
"maxLength": 200
},
{
"name": "POID_PRODUCT",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "STATUS_PRODUCTO",
"datatype": "TEXT",
"decimal_precision": null,
"maxLength": 200
},
{
"name": "SERVICIO",
"datatype": "TEXT",
"maxLength": 200
},
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 100
}
],
"indexes": ["CD_PAQUETE"]
},
{
"identifier": "CATALOGO_PROMOCIONES",
"fields": [
{
"name": "NOMBRE_PRODUCTO",
"datatype": "TEXT",
"maxLength": 100
},
{
"identifier": "CD_CLIENTE",
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "RELACION_PROMOCION_3PA2P",
"fields": [
{
"name": "TRESP",
"datatype": "TEXT",
"decimal_precision": null,
"maxLength": 50
},
{
"identifier": "FH_CONTRATACION",
"datatype": "DATE",
"pattern": "%d-%m-%y",
"decimal_precision": null,
"maxLength": null
"name": "DOSP",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"procedure_identifier": "procedure_prueba2",
"identifier": "RELACION_POIDPAQUETE",
"fields": [
{
"identifier": "col1",
"datatype": "DATE",
"pattern": "%Y-%m-%d",
"decimal_precision": null,
"maxLength": null
"name": "POID_PRODUCT",
"datatype": "TEXT",
"maxLength": 50
},
{
"identifier": "col2",
"datatype": "TIME",
"pattern": "%H:%M:%S",
"decimal_precision": null,
"maxLength": null
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "RELACION_PAQINI_PAQFIN",
"fields": [
{
"name": "COD_PAQ_INI",
"datatype": "TEXT",
"maxLength": 50
},
{
"identifier": "col3",
"datatype": "DATETIME",
"pattern": "%Y-%m-%d %H:%M:%S",
"decimal_precision": null,
"maxLength": null
"name": "COD_PAQ_FIN",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "PAQUETES_NOPROMOCION",
"fields": [
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "PROCEDURE_1",
"fields": [
{
"name": "CD_FOLIO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_CUENTA",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "NB_PAQUETE",
"datatype": "TEXT",
"maxLength": 200
}
]
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment