Commit 8f6a5a31 authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer_ca' into 'developer'

Developer ca

See merge request !14
parents 8ebc05c1 122df81f
......@@ -18,17 +18,20 @@ import logging
logger = logging.getLogger()
def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str, engine, **kwargs) -> None:
def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str, intern_conn, **kwargs) -> None:
engine = intern_conn.engine
delete_task_instances()
ti = kwargs["ti"]
#created_Tables = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CREATED_TABLES")
procedures = ti.xcom_pull(task_ids="MASTER_TRANSFORMATION", key="PROC_CREATED")
if procedures:
for procedure in procedures:
logger.info(f"Borrando procedures {procedure}")
delete = delete_procedure(procedure, engine)
if delete:
logger.info(f"Borrado correctamente el procedure {procedure}")
with engine.connect() as conn:
result_p = bool(intern_conn.check_procedure(procedure, conn))
if result_p:
logger.info(f"Borrando procedures {procedure}")
delete = delete_procedure(procedure, intern_conn)
if delete:
logger.info(f"Borrado correctamente el procedure {procedure}")
success_tasks = ti.xcom_pull(task_ids="GENERATORS", key="SUCCESS_TASKS")
failed_tasks = ti.xcom_pull(task_ids="GENERATORS", key="FAILED_TASKS")
conf = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CONTROL-CONFIG")
......@@ -38,7 +41,6 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
if not prefix.endswith("/"):
prefix += "/"
key = prefix + control_params["filename"]
#conf = json.dumps(conf, indent=2, default=str)
final_dict = {}
status = ProcessStatusEnum.SUCCESS.value
if not isinstance(success_tasks, type(None)) and len(success_tasks) > 0:
......@@ -59,13 +61,13 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
def clean(command: str, intern_conn):
engine = intern_conn.engine
#tablename = select_multiple(command)["tablename"]
tablename = command
logger.info(f"Borrando tabla {tablename}")
delete = delete_table(tablename, engine)
if delete:
delete_p = delete_procedure(tablename, intern_conn)
delete_t = delete_table(tablename, intern_conn)
if delete_t:
logger.info(f"Borrado correctamente la tabla {tablename}")
elif delete_p:
logger.info(f"Borrado correctamente el procedure {tablename}")
logger.info(f"Borrado todas las variables xcom")
......@@ -110,7 +112,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider
validate_task = PythonOperator(
task_id="VALIDATE_CLEANER",
python_callable=validate_clean,
op_kwargs={'control_params': control_s3, 'provider': provider, 'timezone': timezone, 'engine':db_intern_conn.engine},
op_kwargs={'control_params': control_s3, 'provider': provider, 'timezone': timezone, 'intern_conn':db_intern_conn},
trigger_rule='none_skipped'
)
cleaners >> tasks >> validate_task
......
......@@ -7,29 +7,18 @@ import logging
logger = logging.getLogger()
def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
def save_from_dataframe(df: pd.DataFrame, tablename: str, connection, is_first: bool = False) -> bool:
save = False
try:
chunksize = 2000
# db_type = connection.db_type
db_type = connection.db_type
connection = connection.engine
# print(df["CREACION_PRODUCTO"].value_counts())
with connection.connect() as conn:
# if db_type == DatabaseTypeEnum.ORACLE.value:
# df.info()
# aux = df.columns[df.dtypes == 'object'].tolist()
# print(aux)
# dtyp = {}
# for col in aux:
# print(col)
# print(df[col].dtype)
# df[col] = df[col].astype(str)
# dtyp.update({col: VARCHAR(df[col].str.len().max())})
# # dtyp = {c: VARCHAR(df[c].str.len().max()) for c in aux}
# print(dtyp)
# df.to_sql(tablename, conn, if_exists='append', dtype=dtyp, index=False, chunksize=chunksize)
# else:
df.to_sql(tablename, conn, if_exists='append', index=False, chunksize=chunksize)
if db_type == DatabaseTypeEnum.ORACLE.value:
dtypes = {c: VARCHAR(df[c].str.len().max()) for c in df.columns[df.dtypes == 'object'].tolist()}
df.to_sql(tablename, conn, if_exists='append', dtype=dtypes, index=False, chunksize=chunksize)
else:
df.to_sql(tablename, conn, if_exists='append', index=False, chunksize=chunksize)
save = True
except Exception as e:
logger.error(f"Error guardando resultados desde dataframe. {e}")
......
import logging
import time
from typing import List
import re
from enums.DatabaseTypeEnum import DatabaseTypeEnum
logger = logging.getLogger()
......@@ -15,36 +18,69 @@ def execute_transformations(commands: List[str], engine):
logger.error(f"Error ejecutando comando de transformación. {e}")
def delete_table(tablename: str, engine) -> bool:
def delete_table(tablename: str, connection) -> bool:
engine = connection.engine
delete = False
try:
command = f'DROP TABLE IF EXISTS {tablename}'
start_time = time.time()
with engine.connect() as conn:
try:
_ = conn.execute(command)
except Exception as e:
logger.error(f"Tabla no encontrada. {e}")
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
if connection.db_type == DatabaseTypeEnum.ORACLE.value:
command = f'DROP TABLE {tablename}'
start_time = time.time()
with engine.connect() as conn:
try:
check_query = f"SELECT table_name FROM all_tables WHERE table_name = '{tablename.upper()}'"
result = conn.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Tabla no encontrada. {e}")
if exists:
_ = conn.execute(command)
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
else:
command = f'DROP TABLE IF EXISTS {tablename}'
start_time = time.time()
with engine.connect() as conn:
try:
_ = conn.execute(command)
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
except Exception as e:
logger.error(f"Tabla no encontrada. {e}")
except Exception as e:
logger.error(f"Error borrando tabla {tablename}. {e}")
finally:
return delete
def delete_procedure(procedure: str, engine) -> bool:
def delete_procedure(procedure: str, connection) -> bool:
engine = connection.engine
delete = False
try:
command = f"DROP PROCEDURE IF EXISTS {procedure}"
start_time = time.time()
with engine.connect() as conn:
try:
_ = conn.execute(command)
except Exception as e:
logger.error(f"Procedure no encontrado. {e}")
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
if connection.db_type == DatabaseTypeEnum.ORACLE.value:
command = f"DROP PROCEDURE {procedure}"
start_time = time.time()
with engine.connect() as conn:
try:
proc = re.match(r'([^(]+)', procedure).group(1)
check_query = f"SELECT object_name FROM all_procedures WHERE object_name = '{proc.upper()}'"
result = conn.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Procedure no encontrado. {e}")
if exists:
_ = conn.execute(command)
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
else:
command = f'DROP PROCEDURE IF EXISTS {procedure}'
start_time = time.time()
with engine.connect() as conn:
try:
_ = conn.execute(command)
delete = True
logger.debug(f"Duración de borrado: {time.time() - start_time}")
except Exception as e:
logger.error(f"Procedure no encontrada. {e}")
except Exception as e:
logger.error(f"Error borrando procedure {procedure}. {e}")
finally:
......
......@@ -121,7 +121,7 @@ class Mysql:
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de tabla. {e}")
logger.info(f"No se encuentra la tabla {table_name} en la BD interna")
finally:
return exists
......@@ -130,9 +130,10 @@ class Mysql:
try:
check_query = f"SELECT COUNT(*) FROM {table_name}"
result = connection.execute(check_query)
result = result.fetchone()[0]
if result > 0:
exists = True
except Exception as e:
logger.error(f"Error obteniendo counts de tabla. {e}")
logger.info(f"No se encuentra la tabla {table_name} en la BD interna")
finally:
return exists
\ No newline at end of file
return exists
......@@ -2,7 +2,7 @@ from typing import List, Tuple
from sqlalchemy import create_engine
import oracledb
import re
from enums.DatabaseDialectEnum import DatabaseDialectEnum
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.Databases.Enums.OracleDataTypeEnum import OracleDataTypeEnum
......@@ -91,7 +91,7 @@ class Oracle:
response = ""
try:
command = command.replace(reserved_word, "").replace(";", "")
response = f"begin {command}; end;"
response = f"BEGIN {command}; END;"
logger.debug("COMANDO ORACLE:", response)
except Exception as e:
logger.error(f"Error generando comando sql para procedure Oracle. Comando: {command}. {e}")
......@@ -113,7 +113,8 @@ class Oracle:
def check_procedure(self, procedure_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT text FROM all_source WHERE name = '{procedure_name}'"
procedure_name = re.match(r'([^(]+)', procedure_name).group(1)
check_query = f"SELECT object_name FROM all_procedures WHERE object_name = '{procedure_name.upper()}'"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
......@@ -124,22 +125,23 @@ class Oracle:
def check_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"DESCRIBE {table_name}"
check_query = f"SELECT table_name FROM all_tables WHERE table_name = '{table_name.upper()}'"
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de tabla. {e}")
logger.info(f"No se encuentra la tabla {table_name} en la BD interna")
finally:
return exists
def verify_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT COUNT(*) FROM {table_name}"
check_query = f"SELECT COUNT(*) FROM {table_name.upper()}"
result = connection.execute(check_query)
result = result.fetchone()[0]
if result > 0:
exists = True
except Exception as e:
logger.error(f"Error obteniendo counts de tabla. {e}")
logger.info(f"No se encuentra la tabla {table_name} en la BD interna.")
finally:
return exists
\ No newline at end of file
return exists
......@@ -122,7 +122,7 @@ class Postgres:
result = connection.execute(check_query)
exists = bool(result.fetchone())
except Exception as e:
logger.error(f"Error obteniendo existencia de tabla. {e}")
logger.info(f"No se encuentra la tabla {table_name} en la BD interna")
finally:
return exists
......@@ -131,9 +131,10 @@ class Postgres:
try:
check_query = f"SELECT COUNT(*) FROM {table_name}"
result = connection.execute(check_query)
result = result.fetchone()[0]
if result > 0:
exists = True
except Exception as e:
logger.error(f"Error obteniendo counts de tabla. {e}")
logger.info(f"No se encuentra la tabla {table_name} en la BD interna")
finally:
return exists
\ No newline at end of file
return exists
......@@ -90,6 +90,8 @@ def on_success_extractor(context) -> None:
ti = context["ti"]
task_name = f"{ti.task_id}_{ti.map_index}"
selects = Variable.get('SELECTS', default_var=[], deserialize_json=True)
if len(selects) == 0:
raise AirflowSkipException(f"No se encontraron operaciones de extraccion")
command = selects[ti.map_index]
tablename = select_multiple(command[1])["tablename"]
status = ProcessStatusEnum.SUCCESS.value
......@@ -142,11 +144,10 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
result = bool(intern_conn.check_table(tablename, connection))
resultado = intern_conn.verify_table(tablename, connection)
if not result or not resultado:
_ = delete_table(tablename, intern_conn)
create = intern_conn.create_table(model)
if create:
logger.info(f"Creado correctamente la tabla {tablename}. Creado?: {create}")
#extract_tables.append(tablename)
#task.xcom_push(key="TABLES_CREATED", value=tablename)
else:
raise AssertionError(f"Error creando tabla {tablename}")
if is_procedure:
......@@ -163,13 +164,13 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
data.append(row)
if len(data) == chunksize:
dataframe = pd.DataFrame(data, columns=columns_name)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
save = save_from_dataframe(dataframe, tablename, intern_conn)
if save:
logger.debug(f"Guardado correctamente dataframe. Procesando más bloques")
data.clear()
if len(data) > 0:
dataframe = pd.DataFrame(data, columns=columns_name)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
save = save_from_dataframe(dataframe, tablename, intern_conn)
if save:
logger.debug(f"Migrado correctamente todos los datos")
extract_tables.append(tablename)
......@@ -183,13 +184,13 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
data.append(row)
if len(data) == chunksize:
dataframe = pd.DataFrame(data, columns=columns_name)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
save = save_from_dataframe(dataframe, tablename, intern_conn)
if save:
logger.debug(f"Guardado correctamente dataframe. Procesando más bloques")
data.clear()
if len(data) > 0:
dataframe = pd.DataFrame(data, columns=columns_name)
save = save_from_dataframe(dataframe, tablename, intern_conn.engine)
save = save_from_dataframe(dataframe, tablename, intern_conn)
if save:
logger.debug(f"Migrado correctamente todos los datos")
......@@ -225,7 +226,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
task.xcom_push(key="TABLES_CREATED", value=tablename)
task.xcom_push(key="END_PROCESS_DATETIME_" + str(task.map_index), value=end_process_datetime)
except Exception as e:
delete = delete_table(tablename, intern_conn.engine)
delete = delete_table(tablename, intern_conn)
if delete:
logger.info(f"Se borró correctamente la tabla {tablename}")
raise AssertionError(f"Error creando la tabla y migrando datos. {type(e)}. {e}")
......@@ -240,7 +241,6 @@ def get_select_from_xcom(db_intern_conn, **kwargs):
conf = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="CONTROL-CONFIG")
definitions = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="EXTRACTION-DEFINITION-JSON")
temp_conf = conf[-1]["objects_created"] if "objects_created" in conf[-1].keys() else []
engine = db_intern_conn.engine
logger.info(f"OBJETOS CREADOS: {temp_conf}")
identificadores = [item["identifier"] for item in definitions]
if temp_conf:
......@@ -258,8 +258,8 @@ def get_select_from_xcom(db_intern_conn, **kwargs):
tablas_temp = list(tablas_temp)
if tablas_temp:
for i in tablas_temp:
delete = delete_table(i, engine)
delete2 = delete_procedure(i, engine)
delete = delete_table(i, db_intern_conn)
delete2 = delete_procedure(i, db_intern_conn)
if delete:
logger.info(f"Borrado correctamente la tabla temporal {i}")
if delete2:
......
......@@ -115,7 +115,6 @@ def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, p
else:
hook.load_bytes(buffer.getvalue(), key, bucket, True)
else:
logger.info(f"Llegue aca, para guardar")
logger.info(f"DATA: {data}. BUCKET: {bucket}. KEY: {key}")
# convert_df = pd.read_csv()
if gcp_cloud:
......
......@@ -8,12 +8,12 @@ from airflow.exceptions import AirflowSkipException
from components.DatabaseOperation.DatabaseTransformation import delete_table, delete_procedure
from components.S3Route import get_files_from_prefix, get_file_from_prefix
from components.Xcom import save_commands_to_xcom
from components.Utils import update_sql_commands_2
from components.Control import get_tasks_from_control, update_new_process
from components.S3Route import load_control_to_s3
from components.Xcom import delete_all_xcom_tasks, delete_task_instances
from enums.ProcessStatusEnum import ProcessStatusEnum
from enums.OperationTypeEnum import OperationTypeEnum
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.Timezone import datetime_by_tzone
import logging
......@@ -99,12 +99,16 @@ def transformations(xcom_commands: str, intern_conn, timezone: str, **kwargs):
script_name = xcom_commands[0]
commands = xcom_commands[1]
logger.info(f"Ejecutando transformaciones del script {script_name}")
not_procedure = ["UPDATE","SELECT","CREATE","ALTER","DROP","DELETE","INSERT","GRANT","REVOKE","TRUNCATE","COPY",
"COMMIT","ROLLBACK","USE","BEGIN"]
logger.info(f"XCOM_COMMANDS: {xcom_commands}")
not_procedure = ["UPDATE", "SELECT", "CREATE", "ALTER", "DROP", "DELETE", "INSERT"," GRANT", "REVOKE", "TRUNCATE",
"COPY", "COMMIT", "ROLLBACK", "USE", "BEGIN"]
with engine.connect() as connection:
for command in commands:
logger.info(f"Commando: {command}")
if any(command.startswith(palabra) or command.startswith(palabra.lower()) for palabra in not_procedure):
logger.info(f"Ejecutando comando de transformación: {command}")
logger.info(f"Ejecutando comando de transformación en {intern_conn.db_type} : {command}")
if intern_conn.db_type == DatabaseTypeEnum.ORACLE.value:
command = command[:-1]
_ = connection.execute(command)
else:
logger.info(f"Generando llamada al procedure según bd para: {command}")
......@@ -143,14 +147,14 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
index = procedure.find(label_tablename + ":")
label_lenght = len(label_tablename + ":")
tablename = procedure[index + label_lenght:].strip().split("\n")[0]
delete_procedure(tablename, engine)
delete_procedure(tablename, db_intern_conn)
for element in definitions:
if element["identifier"] == tablename and "transformation_store_procedure" in element.keys() and element["transformation_store_procedure"] == True:
logger.info(f"Ejecutando creacion de procedure: {procedure}")
# result = db_intern_conn.check_procedure(tablename, connection)
# if not result:
try:
logger.info("1")
_ = connection.execute(procedure)
logger.info("2")
except Exception as e:
logger.error(f" Error: {e}")
raise AirflowSkipException
......@@ -159,7 +163,7 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
else:
logger.debug(f"No se encontró el label en {procedure} por ende no se creará")
save_commands_to_xcom(procedures, kwargs['ti'], procedure_mask, transform_mask, procedure_mask, order_delimiter)
logger.debug(f"Procedures cargados en Xcom: {procedures}")
logger.info(f"Procedures cargados en Xcom: {procedures}")
transforms_per_file = []
conf = task.xcom_pull(task_ids="VALIDATE_EXTRACTION", key="CONTROL-CONFIG")
......
......@@ -166,13 +166,6 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str,
for item in data:
if item.lower().strip() == "end":
final_data[-1] = final_data[-1] + "; end;"
# parts = item.split(CommentsScriptEnum.DASHES.value)
# parts = [part for part in parts if len(part.strip()) > 0]
# print(parts)
# if len(parts) > 1:
# for part in parts:
# if not part.strip().lower().startswith(label_tablename.lower()):
# continue
final_item = item
if item.lower().strip().find(label_tablename.lower().strip() + ":") != -1:
......@@ -222,14 +215,6 @@ def select_multiple(command: str) -> Dict[str, Any]:
if command.lower().replace(" ", "").find(no_procedure_init) != -1:
response["is_multiple"] = True
tablename = command[:command.index("|")].strip()
# init_index = command.lower().find("from")
# if init_index == -1:
# raise AssertionError("Query malformed")
# else:
# from_command = command[init_index + 4:]
# tablename_base = from_command.strip().split(" ")
# if len(tablename_base) > 0 and tablename == "":
# tablename = tablename_base[0]
response["tablename"] = tablename
except Exception as e:
raise AssertionError(f"Error validando si es múltiple select y nombre de tabla. {e}")
......
......@@ -5,23 +5,23 @@ app:
database:
sources:
source1:
type: mysql
host: 192.168.1.13
port: 13306
username: root
password: root
database: prueba
service:
schema: sources
type: oracle
host: 192.168.27.22
port: 21521
username: PRUEBABCOM2
password: admin
database:
service: ORCLPDB1
schema:
transformation:
type: mysql
host: 192.168.1.13
port: 13306
username: root
password: root
database: prueba_ca
service:
schema: intern_db
type: oracle
host: 192.168.27.22
port: 21521
username: RLQA_AIR
password: RLQA_AIR99
database:
service: ORCLPDB1
schema:
chunksize: 4000
label_multiple_select: TABLENAME
label_transform_procedure: STORE
......@@ -55,14 +55,14 @@ app:
delimiter: '|'
tmp_path: /tmp
s3_params:
tabla4:
bucket: pruebairflow
tabla1:
bucket: prueba-id
prefix: bcom_results
connection_id: prueba_af
connection_id: conn_script
tabla5:
bucket: pruebairflow
bucket: prueba-id
prefix: bcom_results
connection_id: prueba_af
connection_id: conn_script
report:
s3_params:
bucket: prueba-id
......
This diff is collapsed.
......@@ -5,4 +5,4 @@ oracledb==1.3.2
apache-airflow-providers-google
apache-airflow-providers-amazon
apache-airflow-providers-postgres
apache-airflow-providers-oracle
\ No newline at end of file
apache-airflow-providers-oracle
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment