Commit 547dd646 authored by Erly Villaroel's avatar Erly Villaroel

Merge remote-tracking branch 'origin/developer' into developer_ev

# Conflicts:
#	app/main/engine/util/Utils.py
parents f3b336e2 6e759b58
......@@ -52,6 +52,15 @@ class Database:
except Exception as e:
self.app.logger.error(f"Error creando db engine. {e}")
def get_session(self):
session = None
try:
session = self.factory.get_session()
except Exception as e:
self.app.logger.error(f"Error obteniendo sesion de conexion. {e}")
finally:
return session
def verify_table(self, table_name, connection) -> bool:
result = False
try:
......@@ -61,12 +70,14 @@ class Database:
finally:
return result
def get_action_by_identifier(self, identifier) -> bool:
def get_action_by_identifier(self, identifier, Session) -> str:
script_name = None
session = Session()
try:
self.create_engine()
script_name = self.factory.get_action_by_identifier(identifier)
script_name = self.factory.get_action_by_identifier(identifier, session)
except Exception as e:
self.app.logger.error(f"Error obteniendo el nombre de la tabla según identificador. {e}")
finally:
session.close()
return script_name
......@@ -54,6 +54,18 @@ class Mysql:
except Exception as e:
self.app.logger.error(f"Error creando engine de Mysql. {e}")
def get_session(self):
session = None
try:
if isinstance(self.engine, type(None)):
self.create_engine()
session_factory = sessionmaker(bind=self.engine)
session = scoped_session(session_factory)
except Exception as e:
self.app.logger.error(f"Error creando sesion de conexión para Mysql. {e}")
finally:
return session
def verify_table(self, table_name, connection) -> bool:
exists = False
try:
......@@ -67,17 +79,14 @@ class Mysql:
finally:
return exists
def get_action_by_identifier(self, identifier):
session_factory = sessionmaker(bind=self.engine)
def get_action_by_identifier(self, identifier, session):
script_name = None
with session_factory() as session:
try:
exists = session.query(Model).filter(Model.identifier == identifier).first()
if exists is not None:
script_name = exists.script_name
except Exception as e:
session.rollback()
self.app.logger.info(f"No se encuentra el identificador {identifier} en la BD interna. {e}")
finally:
session.close()
return script_name
try:
exists = session.query(Model).filter(Model.identifier == identifier).first()
if exists is not None:
script_name = exists.script_name
except Exception as e:
session.rollback()
self.app.logger.info(f"No se encuentra el identificador {identifier} en la BD interna. {e}")
finally:
return script_name
......@@ -8,7 +8,7 @@ class DescResponseEnum(Enum):
SCRIPT_ERROR = "Error obteniendo script de bucket"
SCRIPT_EXECUTION_ERROR = "Error en la ejecución del script"
PARAMETERS_ERROR = "Parámetros no definidos en el descriptor"
OUTPUT_ERROR = "Error guardando los resultados en bucket"
OUTPUT_ERROR = "Error guardando los resultados en la bd"
EMPTY_DATASET = "No hay registros en las entidades"
TIMEOUT = "Timeout. Ejecución excedido de tiempo"
ERROR = "Error General de Servicio"
......@@ -6,7 +6,7 @@ metadata = Base.metadata
class ResultModel(Base):
__tablename__ = 'RESULT'
__tablename__ = 'CSS_RESULT_BY_ACTION'
ID = Column(BigInteger, primary_key = True)
ACTION_ID = Column(String(50))
......
......@@ -25,8 +25,12 @@ class Process:
# Obteniendo la conexión a la BD
db_params = cfg.db_params
source = Database(self.app, db_params)
# Esta variable llega después de consultar en la BD el nombre del script de acuerdo al parámetro enviado
script_name = "match-and-exclude-records-actions_v1.py"
db_session = source.get_session()
# Obteniendo el nombre del script
script_name = source.get_action_by_identifier(self.descriptor["idScript"], db_session)
if isinstance(script_name, type(None)):
raise ModuleNotFoundError(f"Error al descargar script desde volumen")
path_script = "scripts/"+script_name
module = path_script[:-3].replace("/", ".")
......@@ -41,15 +45,19 @@ class Process:
obj_script.parser(self.descriptor)
process = obj_script.process(source)
obj_script.process(source)
response = obj_script.response()
result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"])
except IndexError as e:
self.app.logger.error(f"Error extrayendo insumos. Vacío. Error: {e}")
status, status_description = CodeResponseEnum.EMPTY_DATASET, str(e)
except InterruptedError as e:
self.app.logger.error(f"Error guardando resultados en bucket. Error: {e}")
self.app.logger.error(f"Error guardando resultados en la Base de Datos. Error: {e}")
status, status_description = CodeResponseEnum.OUTPUT_ERROR, str(e)
except BufferError as e:
self.app.logger.error(f"Error obteniendo la data. Error: {e}")
......
......@@ -19,28 +19,6 @@ class Utils:
def __init__(self, app) -> None:
self.app = app
self.timezone = Timezone(app)
# def createSession(self, name: str = "app_engine_spark") -> SparkSession:
# session = None
# try:
# spark_master = "local["+str(cfg.spark_cores)+"]"
# total_mem = cfg.spark_mem
# driver_mem = str(int(total_mem / 2)) + 'g'
# executor_mem = str(int(total_mem / 2)) + 'g'
# jars = list(cfg.spark_jars.values())
# jars = ",".join(jars)
# session = SparkSession.builder.master(spark_master) \
# .appName(name) \
# .config("spark.jars", jars) \
# .config("spark.executor.extraClassPath", jars) \
# .config("spark.driver.extraClassPath", jars) \
# .config("spark.driver.memory", driver_mem) \
# .config("spark.executor.memory", executor_mem) \
# .getOrCreate()
#
# except Exception as e:
# self.app.logger.error(f"Error creando sesion Spark. {e}")
# finally:
# return session
def create_temp_file(self) -> str:
fullpath = None
......@@ -79,101 +57,83 @@ class Utils:
'detail': detail})
return response
def get_script_module(self, action_module: str) -> Dict[str, Any]:
response = {}
try:
pass
except Exception as e:
self.app.logger.error(f"Error obteniendo el script del proveedor desde S3. {e}")
response["status"] = StatusEnum.ERROR
response["message"] = str(e)
finally:
return response
def create_result(self, data, descriptor):
result = []
response = {"detail":result}
response = {"detail": result}
try:
if data.count() == 0 :
pass
exclude_pivot = descriptor["config-params"]["exclude-entity-pivot"]
group_pivot = descriptor["params-input"]["pivot-config"]["columns-group"]
transaction_pivot = descriptor["params-input"]["pivot-config"]["columns-transaction"]
group_counterpart = descriptor["params-input"]["counterpart-config"]["columns-group"]
pivot_params = descriptor["params-input"]["pivot-config"]
ctp_params = descriptor["params-input"]["counterpart-config"]
transaction_counterpart = descriptor["params-input"]["counterpart-config"]["columns-transaction"]
sufijo_PVT = SufixEnum.PIVOT.value
sufijo_CTP = SufixEnum.COUNTERPART.value
group_pivot_match = pivot_params["columns-group"]
transaction_pivot_match = pivot_params["columns-transaction"]
transaction_pivot_match = [sufijo_PVT + item if item == transaction_counterpart[index] else item for
index, item in
enumerate(transaction_pivot)]
transaction_counterpart_match = [sufijo_CTP + item if item == transaction_pivot[index] else item for
index, item in
enumerate(transaction_counterpart)]
group_counterpart_match = ctp_params["columns-group"]
transaction_counterpart_match = ctp_params["columns-transaction"]
group_pivot_match = [sufijo_PVT + item if item == group_counterpart[index] else item for index, item in
enumerate(group_pivot)]
group_counterpart_match = [sufijo_CTP + item if item == group_pivot[index] else item for index, item in
enumerate(group_counterpart)]
used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match
for i in data.collect():
input_data = {}
key_transaction = None
key_group_pivot = None
key_group_counterpart = None
for element in used_list:
if key_transaction is None:
key_transaction = str(i[element])
else:
key_transaction = key_transaction + "-" + str(i[element])
for element_g in group_pivot_match:
if key_group_pivot is None:
key_group_pivot = str(i[element_g])
else:
key_group_pivot = key_group_pivot + "-" + str(i[element_g])
for element_c in group_counterpart_match:
if key_group_counterpart is None:
key_group_counterpart = str(i[element_c])
else:
key_group_counterpart = key_group_counterpart + "-" + str(i[element_c])
input_data["key-transaction"] = str(key_transaction)
input_data["key-group-pivot"] = str(key_group_pivot)
input_data["key-group-counterpart"] = str(key_group_counterpart)
input_data["list-ids-pivot"] = str(i[FixedFieldsEnum.INTER_PIVOT_ID.value])
input_data["list-ids-counterpart"] = str(i[FixedFieldsEnum.INTER_CTP_ID.value])
input_data["exclude-ids"] = str(i[FixedFieldsEnum.LISTA_DIFF.value])
input_data["difference-amount"] = str(i[FixedFieldsEnum.DIFF.value])
result.append(input_data)
if data.count() == 0:
self.app.logger.info(f"El dataframe resultado esta vacio")
else:
for i in data.collect():
input_data = {}
key_transaction = None
key_group_pivot = None
key_group_counterpart = None
for element in used_list:
if key_transaction is None:
key_transaction = str(i[element])
else:
key_transaction = key_transaction + "-" + str(i[element])
for element_g in group_pivot_match:
if key_group_pivot is None:
key_group_pivot = str(i[element_g])
else:
key_group_pivot = key_group_pivot + "-" + str(i[element_g])
for element_c in group_counterpart_match:
if key_group_counterpart is None:
key_group_counterpart = str(i[element_c])
else:
key_group_counterpart = key_group_counterpart + "-" + str(i[element_c])
input_data["key-transaction"] = str(key_transaction)
input_data["key-group-pivot"] = "" if key_group_pivot is None else str(key_group_pivot)
input_data["key-group-counterpart"] = "" if key_group_counterpart is None else str(key_group_counterpart)
input_data["list-ids-pivot"] = str(i[FixedFieldsEnum.INTER_PIVOT_ID.value])
input_data["list-ids-counterpart"] = str(i[FixedFieldsEnum.INTER_CTP_ID.value])
input_data["exclude-ids"] = str(i[FixedFieldsEnum.LISTA_DIFF.value])
input_data["difference-amount"] = str(i[FixedFieldsEnum.DIFF.value])
result.append(input_data)
response['status'] = StatusEnum.SUCCESS.value
response["detail"] = result
except Exception as e:
self.app.logger.error(f"Error obteniendo al crear el diccionario de resultados. {e}")
self.app.logger.error(f"Error al crear el diccionario de resultados. {e}")
response["status"] = StatusEnum.ERROR
response["message"] = str(e)
finally:
return response
def save_result(self, result, descriptor, session):
def save_result(self, result, descriptor, Session):
response = {}
session = Session()
try:
d1 = self.timezone.datetime_by_tzone()
result_json = json.dumps(result)
result_json = json.dumps(result["detail"])
result_obj = ResultModel(
ID= None,
ID=None,
ACTION_ID=descriptor["idScript"],
ID_PROCESS=descriptor["idProcess"],
CREATE_DATE= d1,
CREATE_DATE=d1,
KEY=FixedFieldsEnum.MATCH_RECORDS.value,
RESULT_JSON=result_json
)
session.add(result_obj)
session.commit()
response['status'] = StatusEnum.SUCCESS.value
response['status'] = StatusEnum.SUCCESS.name
except Exception as e:
session.rollback()
response["status"] = StatusEnum.ERROR
response["status"] = StatusEnum.ERROR.name
response["message"] = str(e)
self.app.logger.error(f"Error al guardar registro en la base de datos {e}")
finally:
......
......@@ -3,8 +3,8 @@ app:
db_parameters:
# BD Credentials
type: 'mysql'
host: '192.168.0.11'
port: 3301
host: '192.168.1.37'
port: 13306
user: root
password: root
db: css_cuscatlan
......@@ -21,7 +21,7 @@ app:
# Timezone, Loggin level and limit threading number
timezone: 'GMT-5'
time_pattern: '%Y-%m-%d %H:%M:%S.%f'
time_pattern: '%Y-%m-%d %H:%M:%S'
logging: 'INFO'
max_engine_threads: 2 # threads (maximum)
......
......@@ -191,7 +191,6 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.output = merged_df
def response(self):
self.output.show()
return self.output
def createSession(self, name: str = "app_engine_spark"):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment