Commit 008be1de authored by Cristian Aguirre's avatar Cristian Aguirre

Merge branch 'developer_ca' into 'developer'

Add match-records-exclude script

See merge request !2
parents 490f3ea9 efc2cb1b
from flask_app import FlaskApplication
class MainApplication(FlaskApplication):
def __init__(self) -> None:
super().__init__()
def run(self, port):
try:
# Load the configuration using the default values
self.app = self.create_app()
if self.debug:
self.app.logger.info('DEBUG = ' + str(self.debug))
self.app.logger.info('Environment = ' + self.config_mode)
self.app.run(debug=True if self.debug else False, host="0.0.0.0", port=port)
# app.run(self.app, debug=True, host="0.0.0.0", port=port)
except KeyError:
exit('Error: Invalid <config_mode>. Expected values [Debug, Production] ')
from abc import ABC, abstractmethod
from typing import Any, Dict
class ActionInterface(ABC):
""" Clase interfaz de las acciones de CSS-Cuscatlan"""
def __init__(self, app) -> None:
self.app = app
@abstractmethod
def parser(self, descriptor: Dict[str, Any]):
"""Método que parsea los parámetros de entrada"""
raise NotImplementedError
@abstractmethod
def process(self, source_obj):
"""Método que ejecuta la lógica del script"""
raise NotImplementedError
@abstractmethod
def response(self):
"""Método que devuelve la respuesta del script ejecutado"""
raise NotImplementedError
import flask
from flask import Blueprint
from flask import request, jsonify
from config import Config as cfg
from app.main.engine.enum.StatusEnum import StatusEnum
from app.main.engine.service.EngineService import EngineService
class MainController:
def __init__(self, app: flask.Flask) -> None:
self.blueprint = Blueprint('css_engine_blueprint', __name__, url_prefix='')
self.app = app
self.max_executions = cfg.max_engine_threads
self.executions = 0
def get_blueprint(self):
return self.blueprint
def blueprints(self):
@self.blueprint.route('/api/engine/action', methods=['POST'])
def parser():
""" Method to apply an action trhough script execution and save the results in db
Parameters:
descriptor (dict) : From body. Dictionary who have all the attributes and rules to get new attributes (if is neccesary)
and validators
response (dict) : The result message with status and his description
"""
response = {}
try:
self.executions += 1
descriptor = request.get_json()
service = EngineService(self.app, descriptor, self.executions, self.max_executions)
response = service.run()
self.app.logger.info(response)
except Exception as e:
self.app.logger.error(f"Error en el envío del parser al servicio. {e}")
response["statusCode"] = StatusEnum.ERROR.name
response["error"] = {"message": "", "detail": str(e)}
finally:
self.executions -= 1
return jsonify(response)
from typing import Dict, Any
from app.main.engine.database.Mysql import Mysql
from app.main.engine.enum.DatabaseTypeEnum import DatabaseTypeEnum
import logging
logger = logging.getLogger()
class Database:
def __init__(self, app, db_params: Dict[str, Any]) -> None:
self.app = app
self.db_type = db_params["type"]
if self.db_type == DatabaseTypeEnum.MYSQL.value:
host, port, user, password, database, params = (db_params["host"], db_params["port"], db_params["user"],
db_params["password"], db_params["db"], db_params["extra_params"])
self.factory = Mysql(app, host, port, user, password, database, params)
self.engine = None
def create_spark_connection(self):
jdbc_conn = None
try:
jdbc_conn = self.factory.create_spark_connection()
except Exception as e:
self.app.logger.error(f"Error creando spark conexión. {e}")
finally:
return jdbc_conn
def get_basic_connection(self):
connection = None
try:
connection = self.factory.get_basic_connection()
except Exception as e:
self.app.logger.error(f"Error trayendo básica conexión. {e}")
finally:
return connection
def close_basic_connection(self) -> None:
try:
self.factory.connection.close()
self.factory.connection = None
except Exception as e:
self.app.logger.error(f"Error cerrando básica conexión. {e}")
def create_engine(self) -> None:
try:
if isinstance(self.engine, type(None)):
self.factory.create_engine()
self.engine = self.factory.engine
except Exception as e:
self.app.logger.error(f"Error creando db engine. {e}")
def verify_table(self, table_name, connection) -> bool:
result = False
try:
result = self.factory.verify_table(table_name, connection)
except Exception as e:
self.app.logger.error(f"Error obteniendo numero de registros de la tabla. {e}")
finally:
return result
from typing import Any, Dict
import pymysql
from sqlalchemy import create_engine
from app.main.engine.enum.DatabaseDialectEnum import DatabaseDialectEnum
import logging
logger = logging.getLogger()
class Mysql:
def __init__(self, app, host: str, port: int, user: str, password: str, database: str,
params: Dict[str, Any]) -> None:
self.app = app
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.params = params
self.engine = None
self.connection = None
def create_spark_connection(self):
params = {}
try:
url = "jdbc:mysql://"+self.host+":"+str(self.port)+"/"+self.database
properties = {"user": self.user, "password": self.password, "driver": "com.mysql.cj.jdbc.Driver"}
params["url"] = url
params["properties"] = properties
except Exception as e:
self.app.logger.error(f"Error creando conexión Mysql para pyspark. {e}")
finally:
return params
def get_basic_connection(self):
try:
if isinstance(self.connection, type(None)):
self.connection = pymysql.connect(host=self.host, port=self.port, user=self.user,
password=self.password, db=self.database)
except Exception as e:
self.app.logger.error(f"Error obteniendo conexion básica de Mysql. {e}")
finally:
return self.connection
def create_engine(self) -> None:
try:
dialect = DatabaseDialectEnum.MYSQL.value
url = f"{dialect}://{self.user}:{self.password}@{self.host}:{str(self.port)}/{self.database}?charset=utf8mb4"
self.engine = create_engine(url, pool_recycle=3600, pool_pre_ping=True, **self.params)
except Exception as e:
self.app.logger.error(f"Error creando engine de Mysql. {e}")
def verify_table(self, table_name, connection) -> bool:
exists = False
try:
check_query = f"SELECT COUNT(*) FROM {table_name}"
result = connection.execute(check_query)
result = result.fetchone()[0]
if result > 0:
exists = True
except Exception as e:
self.app.logger.info(f"No se encuentra la tabla {table_name} en la BD interna. {e}")
finally:
return exists
from enum import Enum
class CodeResponseEnum(Enum):
SUCCESS = 200
MAX_EXECUTION_ERROR = 601
BD_INPUT_ERROR = 602
SCRIPT_ERROR = 603
SCRIPT_EXECUTION_ERROR = 604
PARAMETERS_ERROR = 605
OUTPUT_ERROR = 606
EMPTY_DATASET = 607
ERROR = 609
from enum import Enum
class DatabaseDialectEnum(Enum):
MYSQL = "mysql+pymysql"
MARIADB = "mysql+pymysql"
ORACLE = "oracle"
POSTGRES = "postgresql+psycopg2"
from enum import Enum
class DatabaseTypeEnum(Enum):
MYSQL = "mysql"
MARIADB = "mariadb"
ORACLE = "oracle"
POSTGRES = "postgres"
from enum import Enum
class DescResponseEnum(Enum):
SUCCESS = ""
MAX_EXECUTION_ERROR = "Máxima cantidad de ejecuciones en paralelo alcanzada"
BD_INPUT_ERROR = "Error obteniendo los datos de la BD"
SCRIPT_ERROR = "Error obteniendo script de bucket"
SCRIPT_EXECUTION_ERROR = "Error en la ejecución del script"
PARAMETERS_ERROR = "Parámetros no definidos en el descriptor"
OUTPUT_ERROR = "Error guardando los resultados en bucket"
EMPTY_DATASET = "No hay registros en las entidades"
TIMEOUT = "Timeout. Ejecución excedido de tiempo"
ERROR = "Error General de Servicio"
from enum import Enum
class StatusEnum(Enum):
SUCCESS = 200
ERROR = 609
from typing import Dict, Any
from app.main.engine.service.Process import Process
from app.main.engine.enum.StatusEnum import StatusEnum
from app.main.engine.enum.DescResponseEnum import DescResponseEnum
class EngineService:
def __init__(self, app, descriptor: Dict[str, Any], executions: int, max_executions: int) -> None:
self.app = app
self.descriptor = descriptor
self.executions = executions
self.max_executions = max_executions
def run(self):
response = {}
try:
self.app.logger.info(f"Ejecuciones simultáneas actualmente: {self.executions}")
if self.executions > self.max_executions:
self.app.logger.info(f"Máxima de ejecuciones en paralelo alcanzado. Máximo: {self.max_executions}")
response = {'status': StatusEnum.ERROR.name.lower(),
'message': DescResponseEnum.MAX_EXECUTION_ERROR.value}
else:
process = Process(self.app, self.descriptor)
response = process.run()
except Exception as e:
self.app.logger.error(f"Error mandando el proceso a engine. {e}")
finally:
return response
from typing import Dict, Any
import time
import traceback as traceback_lib
import importlib
from config import Config as cfg
from app.main.engine.util.Timezone import Timezone
from app.main.engine.util.Utils import Utils
from app.main.engine.enum.StatusEnum import StatusEnum
from app.main.engine.enum.CodeResponseEnum import CodeResponseEnum
from app.main.engine.database.Database import Database
class Process:
def __init__(self, app, descriptor: Dict[str, Any]) -> None:
self.app = app
self.descriptor = descriptor
self.timezone = Timezone(app)
self.utils = Utils(app)
def run(self) -> Dict[str, Any]:
status, status_description = StatusEnum.SUCCESS, ""
try:
# Obteniendo la conexión a la BD
db_params = cfg.db_params
source = Database(self.app, db_params)
# Esta variable llega después de consultar en la BD el nombre del script de acuerdo al parámetro enviado
script_name = "match-and-exclude-records-actions_v1.py"
path_script = "scripts/"+script_name
module = path_script[:-3].replace("/", ".")
import_module = importlib.import_module(module)
importlib.reload(import_module)
globals().update(importlib.import_module(module).__dict__)
# Parsea los parámetros de entrada
relation = relation_classname_identifier[self.descriptor["idScript"]]
obj_script = globals()[relation](self.app)
obj_script.parser(self.descriptor)
process = obj_script.process(source)
response = obj_script.response()
except IndexError as e:
self.app.logger.error(f"Error extrayendo insumos. Vacío. Error: {e}")
status, status_description = CodeResponseEnum.EMPTY_DATASET, str(e)
except InterruptedError as e:
self.app.logger.error(f"Error guardando resultados en bucket. Error: {e}")
status, status_description = CodeResponseEnum.OUTPUT_ERROR, str(e)
except BufferError as e:
self.app.logger.error(f"Error obteniendo la data. Error: {e}")
status, status_description = CodeResponseEnum.BD_INPUT_ERROR, str(e)
except RuntimeError as e:
self.app.logger.error(f"Error en la ejecución del script {self.descriptor['action_module_script']}. Error: {e}")
status, status_description = CodeResponseEnum.SCRIPT_EXECUTION_ERROR, str(e)
except ModuleNotFoundError as e:
self.app.logger.error(f"Error obteniendo script {self.descriptor['action_module_script']}. Error: {e}")
status, status_description = CodeResponseEnum.SCRIPT_ERROR, str(e)
except ReferenceError as e:
self.app.logger.error(f"Error validando parámetros del descriptor. {e}")
status, status_description = CodeResponseEnum.PARAMETERS_ERROR, str(e)
except Exception as e:
traceback_lib.print_exc()
self.app.logger.error(f"Error procesando engine. {e}")
status, status_description = StatusEnum.ERROR, str(e)
finally:
return self.utils.create_response(status, status_description)
from config import Config as cfg
import pytz
from datetime import datetime
from dateutil.parser import parse
class Timezone:
def __init__(self, app) -> None:
self.app = app
def datetime_by_tzone(self):
tzone = cfg.timezone
offset = None
# Algunos casos donde el timezone es de la forma 4:30 y no se encuentra en timezones de pytz (GMT)
if ":" in tzone:
offset = tzone.split(":")[1]
tzone = tzone.split(":")[0]
if "+" in tzone:
tzone = tzone.replace(tzone[-1], str(int(tzone[-1]) + 1))
timezones_list = pytz.all_timezones
tzones = [x if tzone in x else None for x in timezones_list]
tzones = list(filter(None, tzones))
server_timezone = pytz.timezone(tzones[0])
self.app.logger.debug("Zona Horaria : {}".format(server_timezone))
server_time = server_timezone.localize(datetime.utcnow())
current_time = parse(server_time.strftime('%Y-%m-%d %H:%M:%S.%f %Z'))
if offset:
offset = pytz.FixedOffset((current_time.utcoffset().total_seconds() / 60 + float(offset)) * -1)
offset = offset.utcoffset(datetime.utcnow())
current_time = datetime.utcnow() + offset
else:
current_time = current_time.replace(tzinfo=None) - current_time.utcoffset()
current_time = parse(current_time.strftime(cfg.time_pattern))
self.app.logger.debug("Hora actual: {}".format(current_time))
return current_time
import uuid
from typing import Dict, Any
import os
import shutil
from enum import Enum
# from pyspark.sql import SparkSession
# from config import Config as cfg
from app.main.engine.enum.StatusEnum import StatusEnum
from app.main.engine.enum.DescResponseEnum import DescResponseEnum
class Utils:
def __init__(self, app) -> None:
self.app = app
# def createSession(self, name: str = "app_engine_spark") -> SparkSession:
# session = None
# try:
# spark_master = "local["+str(cfg.spark_cores)+"]"
# total_mem = cfg.spark_mem
# driver_mem = str(int(total_mem / 2)) + 'g'
# executor_mem = str(int(total_mem / 2)) + 'g'
# jars = list(cfg.spark_jars.values())
# jars = ",".join(jars)
# session = SparkSession.builder.master(spark_master) \
# .appName(name) \
# .config("spark.jars", jars) \
# .config("spark.executor.extraClassPath", jars) \
# .config("spark.driver.extraClassPath", jars) \
# .config("spark.driver.memory", driver_mem) \
# .config("spark.executor.memory", executor_mem) \
# .getOrCreate()
#
# except Exception as e:
# self.app.logger.error(f"Error creando sesion Spark. {e}")
# finally:
# return session
def create_temp_file(self) -> str:
fullpath = None
try:
dir_name = str(uuid.uuid4())
path_dir = "/tmp/" + dir_name
os.mkdir(path_dir)
filename = str(uuid.uuid4()) + ".csv"
fullpath = path_dir + "/" + filename
open(fullpath, mode='a').close()
except Exception as e:
self.app.logger.error(f"Error creando directorio temporal.{e}")
finally:
return fullpath
def delete_temp_dir(self, module_name: str) -> bool:
drop = False
try:
if os.path.exists(module_name):
directory = os.path.dirname(module_name)
shutil.rmtree(directory, ignore_errors=True)
self.app.logger.debug(f"directorio borrado: {module_name}")
except Exception as e:
raise AssertionError(f"Error borrando modulo temporal. {e}")
finally:
return drop
def create_response(self, codeEnum: Enum, detail: str = "") -> Dict[str, Any]:
response = {"statusCode": codeEnum.value}
if codeEnum.value == StatusEnum.SUCCESS.value:
response.update({'status': StatusEnum.SUCCESS.name.lower(), 'path': detail})
else:
description = DescResponseEnum[codeEnum.name].value
response.update({'status': StatusEnum.ERROR.name.lower(), 'message': description,
'detail': detail})
return response
def get_script_module(self, action_module: str) -> Dict[str, Any]:
response = {}
try:
pass
except Exception as e:
self.app.logger.error(f"Error obteniendo el script del proveedor desde S3. {e}")
response["status"] = StatusEnum.ERROR
response["message"] = str(e)
finally:
return response
app:
db_parameters:
# BD Credentials
type: 'mysql'
host: '192.168.1.37'
port: 13306
user: root
password: root
db: css_cuscatlan
dialect: 'mysql+pymysql'
# BD conexion configurations
# https://docs.sqlalchemy.org/en/14/core/pooling.html
extra_params:
pool_size: 50
max_overflow: 30
# Volume where script is located
# Timezone, Loggin level and limit threading number
timezone: 'GMT-5'
time_pattern: '%Y-%m-%d %H:%M:%S.%f'
logging: 'INFO'
max_engine_threads: 2 # threads (maximum)
# Make the service in a production state
# Manage connections to the REST Service published. Allow workers to receive the connections.
# https://docs.gunicorn.org/en/stable/
gunicorn:
bind: '0.0.0.0:7500'
worker_class: 'gthread'
threads: 8
worker_connections: 50
loglevel: 'debug'
accesslog: '-'
capture_output: True
import os
from decouple import config
import yaml
conf = yaml.safe_load(open('conf.yml'))
conf = conf["app"]
class Config(object):
basedir = os.path.abspath(os.path.dirname(__file__))
# Set up the App SECRET_KEY
SECRET_KEY = config('SECRET_KEY', default='S#perS3crEt_007')
# Credenciales BBDD
db_params = conf["db_parameters"]
# TimeZone
timezone = conf["timezone"]
time_pattern = conf["time_pattern"]
# Logging level
logging_level = conf["logging"]
# Max threads allowed
max_engine_threads = conf["max_engine_threads"]
class ProductionConfig(Config):
DEBUG = False
# Security
SESSION_COOKIE_HTTPONLY = True
REMEMBER_COOKIE_HTTPONLY = True
REMEMBER_COOKIE_DURATION = 3600
class DebugConfig(Config):
DEBUG = True
# Load all possible configurations
config_dict = {
'Production': ProductionConfig,
'Debug': DebugConfig
}
from flask import Flask
from decouple import config
import config as cfg
class FlaskApplication:
# WARNING: Don't run with debug turned on in production!
def __init__(self) -> None:
self.app = None
self.debug = config('DEBUG', default=False, cast=bool)
# The configuration
self.config_mode = 'Debug' if self.debug else 'Production'
self.config = cfg.config_dict[self.config_mode.capitalize()]
def register_blueprints(self, app):
from app.main.engine.blueprint.main import MainController
service = MainController(app)
service.blueprints()
blueprint = service.get_blueprint()
return blueprint
def create_app(self):
self.app = Flask(__name__)
self.app.logger.setLevel(cfg.conf["logging"])
self.app.config.from_object(self.config)
blueprint = self.register_blueprints(self.app)
self.app.register_blueprint(blueprint)
return self.app
from app import MainApplication
import warnings
warnings.filterwarnings("ignore")
base = MainApplication()
app = base.create_app()
if __name__ == "__main__":
base.run(port=8000)
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment