Commit 48029b5d authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer-SP2' into 'developer'

Fix bugs from QA

See merge request !5
parents 0e639513 26bf8a24
File added
......@@ -51,7 +51,6 @@ NOTA: *Detalle de cada variable de entorno usada por los POD y Airflow:*
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO # Nivel de log de Airflow (webserver, scheduler y workers)
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: America/Lima # Timezone de la Web de Airflow
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima # Timezone del Scheduler de Airflow
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}' # Tiempo de espera de respuesta de Kubernetes API
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom # Ruta de imagen a usar para crear el worker POD
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.1" # Tag a usar para crear el worker POD
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc # Nombre del volumen usado para almacenar los logs
......
......@@ -40,7 +40,6 @@ def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[s
def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str, Any],
timezone: str, task, delete_last: bool = False, frequency: str = "montly") -> List[Dict[str, Any]]:
try:
print("PREV:", conf)
format_date = "%Y-%m" if frequency == "montly" else "%Y-%W"
current_period = str(datetime_by_tzone(timezone, format_date))[:7]
processed_period = task.xcom_pull(task_ids="SCRIPTS-EXTRACTOR", key="PROCESSED_PERIOD")
......@@ -54,7 +53,6 @@ def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str,
conf.append(new_process)
else:
conf = [new_process]
print("LAS:", conf)
except Exception as e:
logger.error(f"Error actualizando archivo de control. {e}")
finally:
......
......@@ -33,7 +33,7 @@ class Mysql:
self.connection = pymysql.connect(host=self.host, port=self.port, user=self.user,
password=self.password, db=self.database)
except Exception as e:
logger.error(f"Error obteniendo conexion básica de Oracle. {e}")
logger.error(f"Error obteniendo conexion básica de Mysql. {e}")
finally:
return self.connection
......
......@@ -34,7 +34,7 @@ class Postgres:
password=self.password, database=self.database,
options="-c search_path="+self.schema)
except Exception as e:
logger.error(f"Error obteniendo conexion básica de Oracle. {e}")
logger.error(f"Error obteniendo conexion básica de Postgres. {e}")
finally:
return self.connection
......
......@@ -107,7 +107,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa
indexes = []
if "indexes" in procedure.keys():
indexes = procedure["indexes"]
model = generateModel(tablename, procedure["fields"], indexes)
model = generateModel(tablename, procedure["fields"], indexes, intern_conn.db_type)
columns_name = [field["name"] for field in procedure["fields"]]
if isinstance(model, type(None)):
raise AssertionError(f"Definición del extracción para {tablename} en el json-descriptor no encontraddo")
......@@ -120,6 +120,8 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa
if is_procedure:
command = command[len(tablename+"|"):]
temp_connection = source_conn.get_basic_connection()
command = source_conn.generate_sql_procedure(command)
logger.debug(f"FINAL COMMAND: {command}")
if source_conn.db_type == DatabaseTypeEnum.ORACLE.value:
cursor = temp_connection.cursor()
cursor.execute(command)
......
......@@ -9,7 +9,8 @@ from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from enums.FileTypeEnum import FileTypeEnum
from enums.DataTypeEnum import DataTypeEnum
from enums.DataTypeOrmEnum import DataTypeOrmEnum
from components.Model.InsumoModel import InsumoModel
from enums.DatabaseTypeEnum import DatabaseTypeEnum
from components.Model.InsumoModel import InsumoModel, InsumoModelOracle
from enums.CommentsScriptEnum import CommentsScriptEnum
from components.Timezone import datetime_by_tzone
......@@ -139,10 +140,8 @@ def select_multiple(command: str) -> Dict[str, Any]:
response = {'is_multiple': False, 'tablename': ''}
tablename = ""
no_procedure_init = "|select"
procedure_init = ["|begin", "|call"]
try:
if command.lower().replace(" ", "").find(procedure_init[0]) != -1 or \
command.lower().replace(" ", "").find(procedure_init[1]) != -1:
if command.lower().replace(" ", "").find(no_procedure_init) == -1:
response["is_multiple"] = True
tablename = command[:command.index("|")].strip()
response["tablename"] = tablename
......@@ -150,14 +149,14 @@ def select_multiple(command: str) -> Dict[str, Any]:
if command.lower().replace(" ", "").find(no_procedure_init) != -1:
response["is_multiple"] = True
tablename = command[:command.index("|")].strip()
init_index = command.lower().find("from")
if init_index == -1:
raise AssertionError("Query malformed")
else:
from_command = command[init_index + 4:]
tablename_base = from_command.strip().split(" ")
if len(tablename_base) > 0 and tablename == "":
tablename = tablename_base[0]
# init_index = command.lower().find("from")
# if init_index == -1:
# raise AssertionError("Query malformed")
# else:
# from_command = command[init_index + 4:]
# tablename_base = from_command.strip().split(" ")
# if len(tablename_base) > 0 and tablename == "":
# tablename = tablename_base[0]
response["tablename"] = tablename
except Exception as e:
raise AssertionError(f"Error validando si es múltiple select y nombre de tabla. {e}")
......@@ -200,12 +199,17 @@ def delete_temp_dir(module_name: str) -> bool:
return drop
def generateModel(tablename: str, attributes: List[Dict[str, Any]], indexes: List[str], modelName: str = "TableModel"):
def generateModel(tablename: str, attributes: List[Dict[str, Any]], indexes: List[str], db_target: str,
modelName: str = "TableModel"):
default_precision = 8
model = type(modelName, (InsumoModel,), {
model_args = {
'__tablename__': tablename,
'__table_args__': {'extend_existing': True}
})
}
if db_target == DatabaseTypeEnum.ORACLE.value:
model = type(modelName, (InsumoModelOracle,), model_args)
else:
model = type(modelName, (InsumoModel,), model_args)
try:
for attribute in attributes:
index = False
......
......@@ -15,23 +15,23 @@ app:
schema: sources
transformation:
type: mysql
host: database-11.cluster-ro-cvsz4ey9eiec.us-east-1.rds.amazonaws.com
port: 3306
username: admin
password: adminadmin
database: prueba_ca_2
host: 192.168.1.2
port: 13306
username: root
password: root
database: prueba_bcom2
service:
schema: intern_db
chunksize: 8000
chunksize: 4000
label_multiple_select: TABLE
source_mask: select # Sufijo (S)
procedure_mask: procedure # S
transformation_mask: transform # S
source_mask: selectDA # Sufijo (S)
procedure_mask: procedureDA # S
transformation_mask: transformDA # S
prefix_order_delimiter: .
cloud_provider: aws
scripts:
s3_params:
bucket: prueba-airflow13
bucket: prueba1234568
prefix: bcom_scripts
connection_id: conn_script
control:
......@@ -48,7 +48,7 @@ app:
delimiter: '|'
tmp_path: /tmp
s3_params:
bucket: prueba-airflow13
bucket: prueba1234568
prefix: bcom_results
connection_id: conn_script
report:
......
......@@ -20,7 +20,7 @@ logger = logging.getLogger()
DAG_NAME = "INFORM_PROCESS"
# Change this path if is deployed in prod or dev
MAIN_PATH = "/root/airflow/dags/"
MAIN_PATH = "/opt/airflow/dags/"
DEFAULT_ARGS = {
'owner': 'BCOM',
......@@ -33,10 +33,8 @@ DEFAULT_ARGS = {
}
def upload_report(report_params: Dict[str, Any], provider: str, timezone: str, **kwargs) -> None:
def upload_report(report_path: str, report_params: Dict[str, Any], provider: str, timezone: str) -> None:
try:
task = kwargs["ti"]
report_path = task.xcom_pull(task_ids="CREATE-REPORT", key="REPORT_PATH")
pattern = report_params["datetime_pattern"]
conn = report_params["s3_params"]["connection_id"]
bucket = report_params["s3_params"]["bucket"]
......@@ -54,7 +52,7 @@ def upload_report(report_params: Dict[str, Any], provider: str, timezone: str, *
logger.error(f"Error subiendo reporte a . {e}")
def create_report(tmp_path: str, **kwargs) -> None:
def create_and_upload_report(tmp_path: str, report_params: Dict[str, Any], provider: str, timezone: str, **kwargs) -> None:
try:
task = kwargs["ti"]
data = task.xcom_pull(task_ids="GET-DATA-REPORT", key="REPORT-DATA")
......@@ -109,7 +107,8 @@ def create_report(tmp_path: str, **kwargs) -> None:
worksheet.merge_range('C'+str(index)+':G'+str(index), f"ARCHIVO GENERADO DESDE LA TABLA: {data[key]['DESCRIPTION']}", row_format)
worksheet.merge_range('H'+str(index)+':I'+str(index), f"ESTADO: {data[key]['STATUS']}", row_format)
worksheet.merge_range('J'+str(index)+':N'+str(index), data[key]['MESSAGE'], row_format)
task.xcom_push(key="REPORT_PATH", value=excel_tmp_path)
# Upload report
upload_report(excel_tmp_path, report_params, provider, timezone)
except Exception as e:
logger.error(f"Error creando reporte. {e}")
......@@ -193,27 +192,22 @@ def set_dag():
trigger_rule="all_success"
)
create = PythonOperator(
get_data = PythonOperator(
task_id="GET-DATA-REPORT",
python_callable=get_data_report,
op_kwargs={},
trigger_rule="all_success"
)
tmp_dir = conf["outputs"]["tmp_path"]
report = PythonOperator(
task_id="CREATE-REPORT",
python_callable=create_report,
op_kwargs={'tmp_path': tmp_dir},
trigger_rule="all_success"
)
report_params = conf["report"]
upload = PythonOperator(
task_id="UPLOAD-REPORT",
python_callable=upload_report,
op_kwargs={'report_params': report_params, 'provider': conf["cloud_provider"], 'timezone': timezone},
create_and_upload = PythonOperator(
task_id="CREATE-AND-UPLOAD-REPORT",
python_callable=create_and_upload_report,
op_kwargs={'tmp_path': tmp_dir, 'report_params': report_params, 'provider': conf["cloud_provider"],
'timezone': timezone},
trigger_rule="all_success"
)
control_extractor >> create >> report >> upload
control_extractor >> get_data >> create_and_upload
return dag
......
[
{
{
"identifier": "TABLA1",
"fields": [
{
......@@ -49,5 +49,281 @@
"datatype": "DECIMAL"
}
]
},
{
"identifier": "TABLA_PRUEBA",
"fields": [
{
"name": "CD_CUENTA",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "DPP_FLAG",
"datatype": "NUMBER"
},
{
"name": "FECHA_FACTURA",
"datatype": "DATE"
},
{
"name": "FECHA_INICIAL_DPP",
"datatype": "DATE"
},
{
"name": "FECHA_LIMITE_DPP",
"datatype": "DATE"
},
{
"name": "FH_CARGA",
"datatype": "DATE"
},
{
"name": "ID_FACTURA",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "MONTO_AJUSTADO",
"datatype": "DECIMAL"
},
{
"name": "MONTO_FACTURA",
"datatype": "DECIMAL"
},
{
"name": "MONTO_PAGO",
"datatype": "DECIMAL"
},
{
"name": "SALDO_FACTURA",
"datatype": "DECIMAL"
}
]
},
{
"identifier": "TABLA_DE_PRUEBA_DA",
"fields": [
{
"name": "CAMPO_1",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CAMPO_2",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CAMPO_3",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CAMPO_4",
"datatype": "TEXT",
"maxLength": 100
}
]
},
{
"identifier": "TACOMVENTAS",
"fields": [
{
"name": "CD_EMPRESA",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "CD_FOLIO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_CUENTA",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "NU_VENDEDOR",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "NU_ADDON",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NB_PAQUETE",
"datatype": "TEXT",
"maxLength": 200
},
{
"name": "CD_CLIENTE",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "NB_CLIENTE",
"datatype": "TEXT",
"maxLength": 200
},
{
"name": "FH_CONTRATACION",
"datatype": "DATE"
},
{
"name": "FH_ACTIVACION",
"datatype": "DATE"
},
{
"name": "FH_OPERACION",
"datatype": "DATE"
},
{
"name": "TP_SERVICIO",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "ST_CLIENTE",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "TP_PAGO",
"datatype": "TEXT",
"maxLength": 10
},
{
"name": "NB_USUACARGA",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "FH_CARGA",
"datatype": "DATE"
},
{
"name": "NU_ANIO",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NU_MES",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NU_SEMANA",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "NU_COMISION",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "TP_PAGOANT",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "REGLA_APLICADA",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "AUX",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "PROMOCION",
"datatype": "TEXT",
"maxLength": 80
},
{
"name": "EMPLEADOEMBAJADOR__C",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CANAL_DE_VENTA__C",
"datatype": "TEXT",
"maxLength": 50
},
{
"name": "CANALREFERIDO__C",
"datatype": "TEXT",
"maxLength": 50
}
],
"indexes": [
"CD_PAQUETE", "NU_ADDON", "CD_CLIENTE"
]
},
{
"identifier": "CATALOGO_PROMOCIONES",
"fields": [
{
"name": "NOMBRE_PRODUCTO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "PROCEDURE_DA",
"fields": [
{
"name": "NOMBRE_PRODUCTO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "CD_PAQUETE",
"datatype": "TEXT",
"maxLength": 50
}
]
},
{
"identifier": "JOIN_1",
"fields": [
{
"name": "PRODUCTO",
"datatype": "TEXT",
"maxLength": 100
},
{
"name": "ANIO",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "SERVICIO",
"datatype": "NUMBER",
"decimal_precision": 0
},
{
"name": "TOTAL_CARGA",
"datatype": "NUMBER",
"decimal_precision": 3
}
]
}
]
\ No newline at end of file
......@@ -79,7 +79,6 @@ data:
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: America/Lima
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}'
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.6"
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc
......
......@@ -24,8 +24,11 @@ spec:
image: cristianfernando/airflow_custom:0.0.4
resources:
requests:
cpu: "1000m"
memory: "4Gi"
cpu: "500m"
memory: "2Gi"
limits:
cpu: "2500m"
memory: "5Gi"
args: ["scheduler"]
envFrom:
- configMapRef:
......
......@@ -21,11 +21,14 @@ spec:
spec:
containers:
- name: airflow-webserver
image: apache/airflow:2.5.3
image: cristianfernando/airflow_custom:0.0.4
resources:
requests:
cpu: "500m"
cpu: "250m"
memory: "500Mi"
limits:
cpu: "500m"
memory: "1000Mi"
args: ["webserver"]
envFrom:
- configMapRef:
......
......@@ -20,6 +20,9 @@ spec:
- name: postgres
image: postgres:12
resources:
requests:
memory: "1Gi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "500m"
......
......@@ -28,6 +28,9 @@ spec:
name: sync-dags-gcloud
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:alpine
resources:
requests:
cpu: "200m"
memory: "500Mi"
limits:
cpu: "250m"
memory: "1Gi"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment