Commit 85fe0d9e authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1

parent 93a4a97c
...@@ -52,6 +52,15 @@ class Database: ...@@ -52,6 +52,15 @@ class Database:
except Exception as e: except Exception as e:
self.app.logger.error(f"Error creando db engine. {e}") self.app.logger.error(f"Error creando db engine. {e}")
def get_session(self):
session = None
try:
session = self.factory.get_session()
except Exception as e:
self.app.logger.error(f"Error obteniendo sesion de conexion. {e}")
finally:
return session
def verify_table(self, table_name, connection) -> bool: def verify_table(self, table_name, connection) -> bool:
result = False result = False
try: try:
...@@ -61,12 +70,14 @@ class Database: ...@@ -61,12 +70,14 @@ class Database:
finally: finally:
return result return result
def get_action_by_identifier(self, identifier) -> bool: def get_action_by_identifier(self, identifier, Session) -> str:
script_name = None script_name = None
session = Session()
try: try:
self.create_engine() self.create_engine()
script_name = self.factory.get_action_by_identifier(identifier) script_name = self.factory.get_action_by_identifier(identifier, session)
except Exception as e: except Exception as e:
self.app.logger.error(f"Error obteniendo el nombre de la tabla según identificador. {e}") self.app.logger.error(f"Error obteniendo el nombre de la tabla según identificador. {e}")
finally: finally:
session.close()
return script_name return script_name
...@@ -54,6 +54,18 @@ class Mysql: ...@@ -54,6 +54,18 @@ class Mysql:
except Exception as e: except Exception as e:
self.app.logger.error(f"Error creando engine de Mysql. {e}") self.app.logger.error(f"Error creando engine de Mysql. {e}")
def get_session(self):
session = None
try:
if isinstance(self.engine, type(None)):
self.create_engine()
session_factory = sessionmaker(bind=self.engine)
session = scoped_session(session_factory)
except Exception as e:
self.app.logger.error(f"Error creando sesion de conexión para Mysql. {e}")
finally:
return session
def verify_table(self, table_name, connection) -> bool: def verify_table(self, table_name, connection) -> bool:
exists = False exists = False
try: try:
...@@ -67,17 +79,14 @@ class Mysql: ...@@ -67,17 +79,14 @@ class Mysql:
finally: finally:
return exists return exists
def get_action_by_identifier(self, identifier): def get_action_by_identifier(self, identifier, session):
session_factory = sessionmaker(bind=self.engine)
script_name = None script_name = None
with session_factory() as session: try:
try: exists = session.query(Model).filter(Model.identifier == identifier).first()
exists = session.query(Model).filter(Model.identifier == identifier).first() if exists is not None:
if exists is not None: script_name = exists.script_name
script_name = exists.script_name except Exception as e:
except Exception as e: session.rollback()
session.rollback() self.app.logger.info(f"No se encuentra el identificador {identifier} en la BD interna. {e}")
self.app.logger.info(f"No se encuentra el identificador {identifier} en la BD interna. {e}") finally:
finally: return script_name
session.close()
return script_name
...@@ -8,7 +8,7 @@ class DescResponseEnum(Enum): ...@@ -8,7 +8,7 @@ class DescResponseEnum(Enum):
SCRIPT_ERROR = "Error obteniendo script de bucket" SCRIPT_ERROR = "Error obteniendo script de bucket"
SCRIPT_EXECUTION_ERROR = "Error en la ejecución del script" SCRIPT_EXECUTION_ERROR = "Error en la ejecución del script"
PARAMETERS_ERROR = "Parámetros no definidos en el descriptor" PARAMETERS_ERROR = "Parámetros no definidos en el descriptor"
OUTPUT_ERROR = "Error guardando los resultados en bucket" OUTPUT_ERROR = "Error guardando los resultados en la bd"
EMPTY_DATASET = "No hay registros en las entidades" EMPTY_DATASET = "No hay registros en las entidades"
TIMEOUT = "Timeout. Ejecución excedido de tiempo" TIMEOUT = "Timeout. Ejecución excedido de tiempo"
ERROR = "Error General de Servicio" ERROR = "Error General de Servicio"
...@@ -6,7 +6,7 @@ metadata = Base.metadata ...@@ -6,7 +6,7 @@ metadata = Base.metadata
class ResultModel(Base): class ResultModel(Base):
__tablename__ = 'RESULT' __tablename__ = 'CSS_RESULT_BY_ACTION'
ID = Column(BigInteger, primary_key = True) ID = Column(BigInteger, primary_key = True)
ACTION_ID = Column(String(50)) ACTION_ID = Column(String(50))
......
...@@ -25,8 +25,12 @@ class Process: ...@@ -25,8 +25,12 @@ class Process:
# Obteniendo la conexión a la BD # Obteniendo la conexión a la BD
db_params = cfg.db_params db_params = cfg.db_params
source = Database(self.app, db_params) source = Database(self.app, db_params)
# Esta variable llega después de consultar en la BD el nombre del script de acuerdo al parámetro enviado db_session = source.get_session()
script_name = "match-and-exclude-records-actions_v1.py"
# Obteniendo el nombre del script
script_name = source.get_action_by_identifier(self.descriptor["idScript"], db_session)
if isinstance(script_name, type(None)):
raise ModuleNotFoundError(f"Error al descargar script desde volumen")
path_script = "scripts/"+script_name path_script = "scripts/"+script_name
module = path_script[:-3].replace("/", ".") module = path_script[:-3].replace("/", ".")
...@@ -41,15 +45,19 @@ class Process: ...@@ -41,15 +45,19 @@ class Process:
obj_script.parser(self.descriptor) obj_script.parser(self.descriptor)
process = obj_script.process(source) obj_script.process(source)
response = obj_script.response() response = obj_script.response()
result = self.utils.create_result(response, self.descriptor)
save = self.utils.save_result(result, self.descriptor, db_session)
if save["status"] == StatusEnum.ERROR.name:
raise InterruptedError(save["message"])
except IndexError as e: except IndexError as e:
self.app.logger.error(f"Error extrayendo insumos. Vacío. Error: {e}") self.app.logger.error(f"Error extrayendo insumos. Vacío. Error: {e}")
status, status_description = CodeResponseEnum.EMPTY_DATASET, str(e) status, status_description = CodeResponseEnum.EMPTY_DATASET, str(e)
except InterruptedError as e: except InterruptedError as e:
self.app.logger.error(f"Error guardando resultados en bucket. Error: {e}") self.app.logger.error(f"Error guardando resultados en la Base de Datos. Error: {e}")
status, status_description = CodeResponseEnum.OUTPUT_ERROR, str(e) status, status_description = CodeResponseEnum.OUTPUT_ERROR, str(e)
except BufferError as e: except BufferError as e:
self.app.logger.error(f"Error obteniendo la data. Error: {e}") self.app.logger.error(f"Error obteniendo la data. Error: {e}")
......
...@@ -19,28 +19,6 @@ class Utils: ...@@ -19,28 +19,6 @@ class Utils:
def __init__(self, app) -> None: def __init__(self, app) -> None:
self.app = app self.app = app
self.timezone = Timezone(app) self.timezone = Timezone(app)
# def createSession(self, name: str = "app_engine_spark") -> SparkSession:
# session = None
# try:
# spark_master = "local["+str(cfg.spark_cores)+"]"
# total_mem = cfg.spark_mem
# driver_mem = str(int(total_mem / 2)) + 'g'
# executor_mem = str(int(total_mem / 2)) + 'g'
# jars = list(cfg.spark_jars.values())
# jars = ",".join(jars)
# session = SparkSession.builder.master(spark_master) \
# .appName(name) \
# .config("spark.jars", jars) \
# .config("spark.executor.extraClassPath", jars) \
# .config("spark.driver.extraClassPath", jars) \
# .config("spark.driver.memory", driver_mem) \
# .config("spark.executor.memory", executor_mem) \
# .getOrCreate()
#
# except Exception as e:
# self.app.logger.error(f"Error creando sesion Spark. {e}")
# finally:
# return session
def create_temp_file(self) -> str: def create_temp_file(self) -> str:
fullpath = None fullpath = None
...@@ -79,100 +57,83 @@ class Utils: ...@@ -79,100 +57,83 @@ class Utils:
'detail': detail}) 'detail': detail})
return response return response
def get_script_module(self, action_module: str) -> Dict[str, Any]:
response = {}
try:
pass
except Exception as e:
self.app.logger.error(f"Error obteniendo el script del proveedor desde S3. {e}")
response["status"] = StatusEnum.ERROR
response["message"] = str(e)
finally:
return response
def create_result(self, data, descriptor): def create_result(self, data, descriptor):
result = [] result = []
response = {} response = {"detail": result}
try: try:
exclude_pivot = descriptor["config-params"]["exclude-entity-pivot"] exclude_pivot = descriptor["config-params"]["exclude-entity-pivot"]
group_pivot = descriptor["params-input"]["pivot-config"]["columns-group"] pivot_params = descriptor["params-input"]["pivot-config"]
ctp_params = descriptor["params-input"]["counterpart-config"]
transaction_pivot = descriptor["params-input"]["pivot-config"]["columns-transaction"]
group_counterpart = descriptor["params-input"]["counterpart-config"]["columns-group"]
transaction_counterpart = descriptor["params-input"]["counterpart-config"]["columns-transaction"] group_pivot_match = pivot_params["columns-group"]
sufijo_PVT = SufixEnum.PIVOT.value transaction_pivot_match = pivot_params["columns-transaction"]
sufijo_CTP = SufixEnum.COUNTERPART.value
transaction_pivot_match = [sufijo_PVT + item if item == transaction_counterpart[index] else item for group_counterpart_match = ctp_params["columns-group"]
index, item in transaction_counterpart_match = ctp_params["columns-transaction"]
enumerate(transaction_pivot)]
transaction_counterpart_match = [sufijo_CTP + item if item == transaction_pivot[index] else item for
index, item in
enumerate(transaction_counterpart)]
group_pivot_match = [sufijo_PVT + item if item == group_counterpart[index] else item for index, item in
enumerate(group_pivot)]
group_counterpart_match = [sufijo_CTP + item if item == group_pivot[index] else item for index, item in
enumerate(group_counterpart)]
used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match
for i in data.collect():
input_data = {} if data.count() == 0:
key_transaction = None self.app.logger.info(f"El dataframe resultado esta vacio")
key_group_pivot = None else:
key_group_counterpart = None for i in data.collect():
for element in used_list: input_data = {}
if key_transaction is None: key_transaction = None
key_transaction = i[element] key_group_pivot = None
else: key_group_counterpart = None
key_transaction = key_transaction + "-" + str(i[element]) for element in used_list:
for element_g in group_pivot_match: if key_transaction is None:
if key_group_pivot is None: key_transaction = str(i[element])
key_group_pivot = str(i[element_g]) else:
else: key_transaction = key_transaction + "-" + str(i[element])
key_group_pivot = key_group_pivot + "-" + str(i[element_g]) for element_g in group_pivot_match:
for element_c in group_counterpart_match: if key_group_pivot is None:
if key_group_counterpart is None: key_group_pivot = str(i[element_g])
key_group_counterpart = str(i[element_c]) else:
else: key_group_pivot = key_group_pivot + "-" + str(i[element_g])
key_group_counterpart = key_group_counterpart + "-" + str(i[element_c]) for element_c in group_counterpart_match:
input_data["key-transaction"] = str(key_transaction) if key_group_counterpart is None:
input_data["key-group-pivot"] = str(key_group_pivot) key_group_counterpart = str(i[element_c])
input_data["key-group-counterpart"] = str(key_group_counterpart) else:
input_data["list-ids-pivot"] = str(i[FixedFieldsEnum.INTER_PIVOT_ID.value]) key_group_counterpart = key_group_counterpart + "-" + str(i[element_c])
input_data["list-ids-counterpart"] = str(i[FixedFieldsEnum.INTER_CTP_ID.value]) input_data["key-transaction"] = str(key_transaction)
input_data["exclude-ids"] = str(i[FixedFieldsEnum.LISTA_DIFF.value]) input_data["key-group-pivot"] = "" if key_group_pivot is None else str(key_group_pivot)
input_data["difference-amount"] = str(i[FixedFieldsEnum.DIFF.value]) input_data["key-group-counterpart"] = "" if key_group_counterpart is None else str(key_group_counterpart)
result.append(input_data) input_data["list-ids-pivot"] = str(i[FixedFieldsEnum.INTER_PIVOT_ID.value])
input_data["list-ids-counterpart"] = str(i[FixedFieldsEnum.INTER_CTP_ID.value])
input_data["exclude-ids"] = str(i[FixedFieldsEnum.LISTA_DIFF.value])
input_data["difference-amount"] = str(i[FixedFieldsEnum.DIFF.value])
result.append(input_data)
response['status'] = StatusEnum.SUCCESS.value response['status'] = StatusEnum.SUCCESS.value
response["detail"] = result response["detail"] = result
except Exception as e: except Exception as e:
self.app.logger.error(f"Error obteniendo al crear el diccionario de resultados. {e}") self.app.logger.error(f"Error al crear el diccionario de resultados. {e}")
response["status"] = StatusEnum.ERROR response["status"] = StatusEnum.ERROR
response["message"] = str(e) response["message"] = str(e)
finally: finally:
return response return response
def save_result(self, result, descriptor, session): def save_result(self, result, descriptor, Session):
response = {} response = {}
session = Session()
try: try:
d1 = self.timezone.datetime_by_tzone() d1 = self.timezone.datetime_by_tzone()
result_json = json.dumps(result) result_json = json.dumps(result["detail"])
result_obj = ResultModel( result_obj = ResultModel(
ID= None, ID=None,
ACTION_ID=descriptor["idScript"], ACTION_ID=descriptor["idScript"],
ID_PROCESS=descriptor["idProcess"], ID_PROCESS=descriptor["idProcess"],
CREATE_DATE= d1, CREATE_DATE=d1,
KEY=FixedFieldsEnum.MATCH_RECORDS.value, KEY=FixedFieldsEnum.MATCH_RECORDS.value,
RESULT_JSON=result_json RESULT_JSON=result_json
) )
session.add(result_obj) session.add(result_obj)
session.commit() session.commit()
response['status'] = StatusEnum.SUCCESS.value response['status'] = StatusEnum.SUCCESS.name
except Exception as e: except Exception as e:
session.rollback() session.rollback()
response["status"] = StatusEnum.ERROR response["status"] = StatusEnum.ERROR.name
response["message"] = str(e) response["message"] = str(e)
self.app.logger.error(f"Error al guardar registro en la base de datos {e}") self.app.logger.error(f"Error al guardar registro en la base de datos {e}")
finally: finally:
......
...@@ -3,8 +3,8 @@ app: ...@@ -3,8 +3,8 @@ app:
db_parameters: db_parameters:
# BD Credentials # BD Credentials
type: 'mysql' type: 'mysql'
host: '192.168.0.11' host: '192.168.1.37'
port: 3301 port: 13306
user: root user: root
password: root password: root
db: css_cuscatlan db: css_cuscatlan
...@@ -21,7 +21,7 @@ app: ...@@ -21,7 +21,7 @@ app:
# Timezone, Loggin level and limit threading number # Timezone, Loggin level and limit threading number
timezone: 'GMT-5' timezone: 'GMT-5'
time_pattern: '%Y-%m-%d %H:%M:%S.%f' time_pattern: '%Y-%m-%d %H:%M:%S'
logging: 'INFO' logging: 'INFO'
max_engine_threads: 2 # threads (maximum) max_engine_threads: 2 # threads (maximum)
......
...@@ -191,7 +191,6 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -191,7 +191,6 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.output = merged_df self.output = merged_df
def response(self): def response(self):
self.output.show()
return self.output return self.output
def createSession(self, name: str = "app_engine_spark"): def createSession(self, name: str = "app_engine_spark"):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment