Commit ad60e8b9 authored by Erly Villaroel's avatar Erly Villaroel

Metodos para la base de datos

parent 3cd6f64d
......@@ -92,8 +92,10 @@ class Utils:
def create_result(self, data, descriptor):
result = []
response = {}
response = {"detail":result}
try:
if data.count() == 0 :
raise ValueError("El dataframe esta vacio")
exclude_pivot = descriptor["config-params"]["exclude-entity-pivot"]
group_pivot = descriptor["params-input"]["pivot-config"]["columns-group"]
......@@ -123,7 +125,7 @@ class Utils:
key_group_counterpart = None
for element in used_list:
if key_transaction is None:
key_transaction = i[element]
key_transaction = str(i[element])
else:
key_transaction = key_transaction + "-" + str(i[element])
for element_g in group_pivot_match:
......@@ -156,6 +158,8 @@ class Utils:
def save_result(self, result, descriptor, session):
response = {}
try:
if len(result) == 0:
raise ValueError("No existen registros para guardar")
d1 = self.timezone.datetime_by_tzone()
result_json = json.dumps(result)
......
from app import MainApplication
import warnings
from sqlalchemy.orm import sessionmaker, scoped_session
from pyspark.sql import SparkSession
from decimal import Decimal
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, ArrayType
warnings.filterwarnings("ignore")
from sqlalchemy import create_engine
from app.main.engine.util.Utils import Utils
base = MainApplication()
app = base.create_app()
#
# if __name__ == "__main__":
# base.run(port=8000)
if __name__ == "__main__":
base.run(port=8000)
spark = SparkSession.builder \
.appName("Crear DataFrame en PySpark") \
.getOrCreate()
# Especificar el esquema del DataFrame
schema = StructType([
StructField("PIVOT_Fecha", StringType(), True),
StructField("COUNTERPART_Fecha", StringType(), True),
StructField("Cuenta", StringType(), True),
StructField("Account", StringType(), True),
StructField("DIFF", DecimalType(10, 2), True),
StructField("LISTA_DIFF", ArrayType(StringType()), True),
StructField("INTER_PIVOT_ID", StringType(), True),
StructField("INTER_CTP_ID", StringType(), True),
StructField("PIVOT_Valor", DecimalType(10, 2), True),
StructField("COUNTERPART_Valor", DecimalType(10, 2), True)
])
# Crear el DataFrame con datos de ejemplo
data = [
]
df = spark.createDataFrame(data, schema)
# Mostrar el DataFrame
df.show()
descriptor = {
"idProcess" : 500240,
"idScript": "match-and-exclude",
"config-params":{
"max-records-per-combination": 10,
"max-timeout-per-combination": 1000,
"exclude-entity-pivot": True
},
"params-input": {
"pivot-config": {
"tablename" : "PIVOT_TEMPORAL",
"id-column" : "ID",
"amount-columns" : "Valor",
"columns-group" : ["Fecha", "Cuenta"],
"columns-transaction" : ["Fecha", "Cuenta", "Valor"]
},
"counterpart-config": {
"tablename" : "PIVOT_TEMPORAL",
"id-column" : "ID",
"amount-columns" : "Valor",
"columns-group" : ["Fecha", "Account"],
"columns-transaction" : ["Fecha", "Account", "Valor"]
}
}
}
a = Utils(app).create_result(df, descriptor)
print("diccion2rio",a)
engine = create_engine("mysql+pymysql://root:root@192.168.0.11:3301/cusca")
session_factory = sessionmaker(bind=engine)
session = session_factory()
b = Utils(app).save_result(a["detail"],descriptor, session)
print(b)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment