Commit bee8601f authored by Erly Villaroel's avatar Erly Villaroel

Metodos para la base de datos

parent 6977026b
...@@ -60,3 +60,13 @@ class Database: ...@@ -60,3 +60,13 @@ class Database:
self.app.logger.error(f"Error obteniendo numero de registros de la tabla. {e}") self.app.logger.error(f"Error obteniendo numero de registros de la tabla. {e}")
finally: finally:
return result return result
def get_action_by_identifier(self, identifier) -> bool:
script_name = None
try:
self.create_engine()
script_name = self.factory.get_action_by_identifier(identifier)
except Exception as e:
self.app.logger.error(f"Error obteniendo el nombre de la tabla según identificador. {e}")
finally:
return script_name
...@@ -2,8 +2,9 @@ from typing import Any, Dict ...@@ -2,8 +2,9 @@ from typing import Any, Dict
import pymysql import pymysql
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from app.main.engine.enum.DatabaseDialectEnum import DatabaseDialectEnum from app.main.engine.enum.DatabaseDialectEnum import DatabaseDialectEnum
from app.main.engine.models.Model import Model
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
...@@ -65,3 +66,18 @@ class Mysql: ...@@ -65,3 +66,18 @@ class Mysql:
self.app.logger.info(f"No se encuentra la tabla {table_name} en la BD interna. {e}") self.app.logger.info(f"No se encuentra la tabla {table_name} en la BD interna. {e}")
finally: finally:
return exists return exists
def get_action_by_identifier(self, identifier):
session_factory = sessionmaker(bind=self.engine)
script_name = None
with session_factory() as session:
try:
exists = session.query(Model).filter(Model.identifier == identifier).first()
if exists is not None:
script_name = exists.script_name
except Exception as e:
session.rollback()
self.app.logger.info(f"No se encuentra el identificador {identifier} en la BD interna. {e}")
finally:
session.close()
return script_name
from enum import Enum
class FixedFieldsEnum(Enum):
INTER_PIVOT_ID = "INTER_PIVOT_ID"
INTER_CTP_ID = "INTER_CTP_ID"
LISTA_DIFF = "LISTA_DIFF"
DIFF = "DIFF"
MATCH_RECORDS = "match-records"
from enum import Enum
class SufixEnum(Enum):
PIVOT = "PIVOT_"
COUNTERPART = "COUNTERPART_"
from sqlalchemy import BigInteger, Column, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
metadata = Base.metadata
class Model(Base):
__tablename__ = 'CSS_SCRIPT_BY_ACTION'
ID = Column(BigInteger, primary_key = True)
identifier = Column(String(50))
script_name = Column(String(150))
from sqlalchemy import BigInteger, Column, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
metadata = Base.metadata
class ResultModel(Base):
__tablename__ = 'RESULT'
ID = Column(BigInteger, primary_key = True)
ACTION_ID = Column(String(50))
ID_PROCESS = Column(String(150))
CREATE_DATE = Column(String(150))
KEY = Column(String(150))
RESULT_JSON = Column(String(1500))
...@@ -4,17 +4,21 @@ import os ...@@ -4,17 +4,21 @@ import os
import shutil import shutil
from enum import Enum from enum import Enum
# from pyspark.sql import SparkSession # from pyspark.sql import SparkSession
import json
from app.main.engine.util.Timezone import Timezone
# from config import Config as cfg # from config import Config as cfg
from app.main.engine.enum.StatusEnum import StatusEnum from app.main.engine.enum.StatusEnum import StatusEnum
from app.main.engine.enum.SufixEnum import SufixEnum
from app.main.engine.enum.FixedFieldsEnum import FixedFieldsEnum
from app.main.engine.enum.DescResponseEnum import DescResponseEnum from app.main.engine.enum.DescResponseEnum import DescResponseEnum
from app.main.engine.models.ResultMode import ResultModel
class Utils: class Utils:
def __init__(self, app) -> None: def __init__(self, app) -> None:
self.app = app self.app = app
self.timezone = Timezone(app)
# def createSession(self, name: str = "app_engine_spark") -> SparkSession: # def createSession(self, name: str = "app_engine_spark") -> SparkSession:
# session = None # session = None
# try: # try:
...@@ -86,3 +90,91 @@ class Utils: ...@@ -86,3 +90,91 @@ class Utils:
finally: finally:
return response return response
def create_result(self, data, descriptor):
result = []
response = {}
try:
exclude_pivot = descriptor["config-params"]["exclude-entity-pivot"]
group_pivot = descriptor["params-input"]["pivot-config"]["columns-group"]
transaction_pivot = descriptor["params-input"]["pivot-config"]["columns-transaction"]
group_counterpart = descriptor["params-input"]["counterpart-config"]["columns-group"]
transaction_counterpart = descriptor["params-input"]["counterpart-config"]["columns-transaction"]
sufijo_PVT = SufixEnum.PIVOT.value
sufijo_CTP = SufixEnum.COUNTERPART.value
transaction_pivot_match = [sufijo_PVT + item if item == transaction_counterpart[index] else item for
index, item in
enumerate(transaction_pivot)]
transaction_counterpart_match = [sufijo_CTP + item if item == transaction_pivot[index] else item for
index, item in
enumerate(transaction_counterpart)]
group_pivot_match = [sufijo_PVT + item if item == group_counterpart[index] else item for index, item in
enumerate(group_pivot)]
group_counterpart_match = [sufijo_CTP + item if item == group_pivot[index] else item for index, item in
enumerate(group_counterpart)]
used_list = transaction_counterpart_match if exclude_pivot else transaction_pivot_match
for i in data.collect():
input_data = {}
key_transaction = None
key_group_pivot = None
key_group_counterpart = None
for element in used_list:
if key_transaction is None:
key_transaction = i[element]
else:
key_transaction = key_transaction + "-" + str(i[element])
for element_g in group_pivot_match:
if key_group_pivot is None:
key_group_pivot = str(i[element_g])
else:
key_group_pivot = key_group_pivot + "-" + str(i[element_g])
for element_c in group_counterpart_match:
if key_group_counterpart is None:
key_group_counterpart = str(i[element_c])
else:
key_group_counterpart = key_group_counterpart + "-" + str(i[element_c])
input_data["key-transaction"] = str(key_transaction)
input_data["key-group-pivot"] = str(key_group_pivot)
input_data["key-group-counterpart"] = str(key_group_counterpart)
input_data["list-ids-pivot"] = str(i[FixedFieldsEnum.INTER_PIVOT_ID.value])
input_data["list-ids-counterpart"] = str(i[FixedFieldsEnum.INTER_CTP_ID.value])
input_data["exclude-ids"] = str(i[FixedFieldsEnum.LISTA_DIFF.value])
input_data["difference-amount"] = str(i[FixedFieldsEnum.DIFF.value])
result.append(input_data)
response['status'] = StatusEnum.SUCCESS.value
response["detail"] = result
except Exception as e:
self.app.logger.error(f"Error obteniendo al crear el diccionario de resultados. {e}")
response["status"] = StatusEnum.ERROR
response["message"] = str(e)
finally:
return response
def save_result(self, result, descriptor, session):
response = {}
try:
d1 = self.timezone.datetime_by_tzone()
result_json = json.dumps(result)
result_obj = ResultModel(
ID= None,
ACTION_ID=descriptor["idScript"],
ID_PROCESS=descriptor["idProcess"],
CREATE_DATE= d1,
KEY=FixedFieldsEnum.MATCH_RECORDS.value,
RESULT_JSON=result_json
)
session.add(result_obj)
session.commit()
response['status'] = StatusEnum.SUCCESS.value
except Exception as e:
session.rollback()
response["status"] = StatusEnum.ERROR
response["message"] = str(e)
self.app.logger.error(f"Error al guardar registro en la base de datos {e}")
finally:
session.close()
return response
...@@ -3,11 +3,11 @@ app: ...@@ -3,11 +3,11 @@ app:
db_parameters: db_parameters:
# BD Credentials # BD Credentials
type: 'mysql' type: 'mysql'
host: '192.168.1.37' host: '192.168.0.11'
port: 13306 port: 3301
user: root user: root
password: root password: root
db: css_engine db: cusca
dialect: 'mysql+pymysql' dialect: 'mysql+pymysql'
# BD conexion configurations # BD conexion configurations
# https://docs.sqlalchemy.org/en/14/core/pooling.html # https://docs.sqlalchemy.org/en/14/core/pooling.html
......
from app import MainApplication from app import MainApplication
import warnings import warnings
from sqlalchemy.orm import sessionmaker, scoped_session
from pyspark.sql import SparkSession
from decimal import Decimal
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, ArrayType
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
from sqlalchemy import create_engine
from app.main.engine.util.Utils import Utils
base = MainApplication() base = MainApplication()
app = base.create_app() app = base.create_app()
#
# if __name__ == "__main__":
# base.run(port=8000)
spark = SparkSession.builder \
.appName("Crear DataFrame en PySpark") \
.getOrCreate()
# Especificar el esquema del DataFrame
schema = StructType([
StructField("PIVOT_Fecha", StringType(), True),
StructField("COUNTERPART_Fecha", StringType(), True),
StructField("Cuenta", StringType(), True),
StructField("Account", StringType(), True),
StructField("DIFF", DecimalType(10, 2), True),
StructField("LISTA_DIFF", ArrayType(StringType()), True),
StructField("INTER_PIVOT_ID", StringType(), True),
StructField("INTER_CTP_ID", StringType(), True),
StructField("PIVOT_Valor", DecimalType(10, 2), True),
StructField("COUNTERPART_Valor", DecimalType(10, 2), True)
])
# Crear el DataFrame con datos de ejemplo
data = [
("2024-04-01", "2024-04-01", "Cuenta1", "Account1", Decimal('10.50'), ['1', '2', '3'], "ID1", "ID2", Decimal('100.00'), Decimal('95.00')),
("2024-04-02", "2024-04-02", "Cuenta2", "Account2", Decimal('15.75'), ['4', '5', '6'], "ID3", "ID4", Decimal('200.00'), Decimal('190.00')),
("2024-04-03", "2024-04-03", "Cuenta3", "Account3", Decimal('20.25'), ['7', '8', '9'], "ID5", "ID6", Decimal('300.00'), Decimal('280.00'))
]
df = spark.createDataFrame(data, schema)
if __name__ == "__main__": # Mostrar el DataFrame
base.run(port=8000) df.show()
descriptor = {
"idProcess" : 500240,
"idScript": "match-and-exclude",
"config-params":{
"max-records-per-combination": 10,
"max-timeout-per-combination": 1000,
"exclude-entity-pivot": True
},
"params-input": {
"pivot-config": {
"tablename" : "PIVOT_TEMPORAL",
"id-column" : "ID",
"amount-columns" : "Valor",
"columns-group" : ["Fecha", "Cuenta"],
"columns-transaction" : ["Fecha", "Cuenta", "Valor"]
},
"counterpart-config": {
"tablename" : "PIVOT_TEMPORAL",
"id-column" : "ID",
"amount-columns" : "Valor",
"columns-group" : ["Fecha", "Account"],
"columns-transaction" : ["Fecha", "Account", "Valor"]
}
}
}
a = Utils(app).create_result(df, descriptor)
print(a)
engine = create_engine("mysql+pymysql://root:root@192.168.0.11:3301/cusca")
session_factory = sessionmaker(bind=engine)
session = session_factory()
b = Utils(app).save_result(a["detail"],descriptor, session)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment