Commit 016c0749 authored by Erly Villaroel's avatar Erly Villaroel

Timeout en el script

parent 18e7ce35
......@@ -6,7 +6,7 @@ from flask import request, jsonify
from config import Config as cfg
from app.main.engine.enum.StatusEnum import StatusEnum
from app.main.engine.service.EngineService import EngineService
from wrapt_timeout_decorator import *
class MainController:
......@@ -33,25 +33,9 @@ class MainController:
try:
self.executions += 1
descriptor = request.get_json()
tiempo_limite = descriptor["config-params"]["max-timeout-per-combinations"]
@timeout(tiempo_limite)
def procesamiento():
response = {}
try:
response = EngineService(self.app, descriptor, self.executions, self.max_executions).run()
except Exception as e:
response["statusCode"] = StatusEnum.ERROR.name
response["error"] = {"message": "", "detail": str(e)}
raise TimeoutError(f"Tiempo límite de ejecución superado{e}")
finally:
return response
response = procesamiento()
service = EngineService(self.app, descriptor, self.executions, self.max_executions)
response = service.run()
self.app.logger.info(response)
except TimeoutError as e:
self.app.logger.error(f"Error al superar timeout. {e}")
response["statusCode"] = StatusEnum.ERROR.name
response["error"] = {"message": "", "detail": str(e)}
except Exception as e:
self.app.logger.error(f"Error en el envío del parser al servicio. {e}")
response["statusCode"] = StatusEnum.ERROR.name
......
......@@ -9,13 +9,7 @@ from app.main.engine.util.Utils import Utils
from app.main.engine.enum.StatusEnum import StatusEnum
from app.main.engine.enum.CodeResponseEnum import CodeResponseEnum
from app.main.engine.database.Database import Database
def timeout_function():
raise TimeoutError("Timeout occurred")
from wrapt_timeout_decorator import *
class Process:
def __init__(self, app, descriptor: Dict[str, Any]) -> None:
self.app = app
......@@ -26,9 +20,6 @@ class Process:
def run(self) -> Dict[str, Any]:
status, status_description = StatusEnum.OK, ""
input_time = self.descriptor["config-params"]["max-timeout-per-combinations"]
t = Timer(input_time, timeout_function)
t.start()
try:
# Obteniendo la conexión a la BD
......@@ -53,19 +44,39 @@ class Process:
obj_script = globals()[relation](self.app)
obj_script.parser(self.descriptor)
tiempo_limite = obj_script.timeout
if tiempo_limite is not None:
@timeout(tiempo_limite)
def procesamiento():
try:
self.app.logger.info(f"Iniciando procesamiento de script")
obj_script.process(source)
# Guardando resultado
self.app.logger.info(f"Generado y guardando resultado")
response = obj_script.response()
# response.show()
result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"])
# Iniciando process
self.app.logger.info(f"Iniciando procesamiento de script")
obj_script.process(source)
except Exception as e:
raise TimeoutError(f"Tiempo límite de ejecución superado{e}")
procesamiento()
# Guardando resultado
self.app.logger.info(f"Generado y guardando resultado")
response = obj_script.response()
# response.show()
result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"])
else:
# Iniciando process
self.app.logger.info(f"Iniciando procesamiento de script")
obj_script.process(source)
# Guardando resultado
self.app.logger.info(f"Generado y guardando resultado")
response = obj_script.response()
# response.show()
result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"])
except IndexError as e:
self.app.logger.error(f"Error extrayendo insumos. Vacío. Error: {e}")
status, status_description = CodeResponseEnum.EMPTY_DATASET, str(e)
......@@ -92,5 +103,4 @@ class Process:
self.app.logger.error(f"Error procesando engine. {e}")
status, status_description = StatusEnum.ERROR, str(e)
finally:
t.cancel()
return self.utils.create_response(status, status_description)
......@@ -35,12 +35,13 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def __init__(self, app) -> None:
super().__init__(app)
self.max_combinations = None
self.timeout = None
self.comb_per_group = None
self.exclude_pivot = None
self.pivot_params = None
self.ctp_params = None
self.output = None
self.config_params = ["max-records-per-combinations", "max-combinations-per-group", "exclude-entity-pivot"]
self.config_params = ["max-records-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"]
def parser(self, descriptor: Dict[str, Any]):
# Validar si pyspark y su versión está instalada
......@@ -76,7 +77,8 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise ReferenceError(f"Parámetro *{param}* no encontrado en pivot o contraparte")
self.max_combinations = configs["max-records-per-combinations"]
self.comb_per_group = configs["max-combinations-per-group"]
if "max-timeout-per-combinations" in configs:
self.timeout = configs["max-timeout-per-combinations"]
self.exclude_pivot = configs["exclude-entity-pivot"]
self.pivot_params = pivot_params
self.ctp_params = ctp_params
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment