Commit cb3ae104 authored by Erly Villaroel's avatar Erly Villaroel

Cambios para manejar dataframe vacio

parent ad60e8b9
from app import MainApplication
import warnings
from sqlalchemy.orm import sessionmaker, scoped_session
from pyspark.sql import SparkSession
from decimal import Decimal
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, ArrayType
warnings.filterwarnings("ignore")
from sqlalchemy import create_engine
from app.main.engine.util.Utils import Utils
base = MainApplication()
app = base.create_app()
#
# if __name__ == "__main__":
# base.run(port=8000)
spark = SparkSession.builder \
.appName("Crear DataFrame en PySpark") \
.getOrCreate()
# Especificar el esquema del DataFrame
schema = StructType([
StructField("PIVOT_Fecha", StringType(), True),
StructField("COUNTERPART_Fecha", StringType(), True),
StructField("Cuenta", StringType(), True),
StructField("Account", StringType(), True),
StructField("DIFF", DecimalType(10, 2), True),
StructField("LISTA_DIFF", ArrayType(StringType()), True),
StructField("INTER_PIVOT_ID", StringType(), True),
StructField("INTER_CTP_ID", StringType(), True),
StructField("PIVOT_Valor", DecimalType(10, 2), True),
StructField("COUNTERPART_Valor", DecimalType(10, 2), True)
])
# Crear el DataFrame con datos de ejemplo
data = [
]
df = spark.createDataFrame(data, schema)
# Mostrar el DataFrame
df.show()
descriptor = {
"idProcess" : 500240,
"idScript": "match-and-exclude",
"config-params":{
"max-records-per-combination": 10,
"max-timeout-per-combination": 1000,
"exclude-entity-pivot": True
},
"params-input": {
"pivot-config": {
"tablename" : "PIVOT_TEMPORAL",
"id-column" : "ID",
"amount-columns" : "Valor",
"columns-group" : ["Fecha", "Cuenta"],
"columns-transaction" : ["Fecha", "Cuenta", "Valor"]
},
"counterpart-config": {
"tablename" : "PIVOT_TEMPORAL",
"id-column" : "ID",
"amount-columns" : "Valor",
"columns-group" : ["Fecha", "Account"],
"columns-transaction" : ["Fecha", "Account", "Valor"]
}
}
}
a = Utils(app).create_result(df, descriptor)
print("diccion2rio",a)
engine = create_engine("mysql+pymysql://root:root@192.168.0.11:3301/cusca")
session_factory = sessionmaker(bind=engine)
session = session_factory()
b = Utils(app).save_result(a["detail"],descriptor, session)
print(b)
if __name__ == "__main__":
base.run(port=8000)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment