Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
CSS-Engine-Python-Cusca
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Proyectos-Innovacion-2024
CSS-Engine-Python-Cusca
Commits
c2d602b8
Commit
c2d602b8
authored
May 08, 2024
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Update action-exclude-records-v1-dask
parent
7086b4fa
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
229 additions
and
481 deletions
+229
-481
ActionInterface.py
app/main/engine/action/ActionInterface.py
+1
-1
Process.py
app/main/engine/service/Process.py
+6
-6
Dockerfile
deploy/Dockerfile
+0
-2
mysql-connector-java-8.0.30.jar
jars/mysql-connector-java-8.0.30.jar
+0
-0
emr_match-and-exclude-records-actions_v1.py
scripts/emr_match-and-exclude-records-actions_v1.py
+0
-366
match-and-exclude-records-actions_v1.py
scripts/match-and-exclude-records-actions_v1.py
+222
-106
No files found.
app/main/engine/action/ActionInterface.py
View file @
c2d602b8
...
@@ -14,7 +14,7 @@ class ActionInterface(ABC):
...
@@ -14,7 +14,7 @@ class ActionInterface(ABC):
raise
NotImplementedError
raise
NotImplementedError
@
abstractmethod
@
abstractmethod
def
process
(
self
,
source_obj
,
script_name
,
timezone
,
pattern
):
def
process
(
self
,
source_obj
):
"""Método que ejecuta la lógica del script"""
"""Método que ejecuta la lógica del script"""
raise
NotImplementedError
raise
NotImplementedError
...
...
app/main/engine/service/Process.py
View file @
c2d602b8
...
@@ -46,16 +46,16 @@ class Process:
...
@@ -46,16 +46,16 @@ class Process:
# Iniciando process
# Iniciando process
self
.
app
.
logger
.
info
(
f
"Iniciando procesamiento de script"
)
self
.
app
.
logger
.
info
(
f
"Iniciando procesamiento de script"
)
obj_script
.
process
(
source
,
script_name
,
cfg
.
timezone
,
cfg
.
time_pattern
)
obj_script
.
process
(
source
)
print
(
"1"
)
print
(
"1"
)
# Guardando resultado
# Guardando resultado
self
.
app
.
logger
.
info
(
f
"Generado y guardando resultado"
)
self
.
app
.
logger
.
info
(
f
"Generado y guardando resultado"
)
# _
= obj_script.response()
response
=
obj_script
.
response
()
# response.show()
# response.show()
#
result = self.utils.create_result(response, self.descriptor)
result
=
self
.
utils
.
create_result
(
response
,
self
.
descriptor
)
#
save = self.utils.save_result(result, self.descriptor, db_session)
save
=
self
.
utils
.
save_result
(
result
,
self
.
descriptor
,
db_session
)
#
if save["status"] == StatusEnum.ERROR.name:
if
save
[
"status"
]
==
StatusEnum
.
ERROR
.
name
:
#
raise InterruptedError(save["message"])
raise
InterruptedError
(
save
[
"message"
])
except
TimeoutError
as
e
:
except
TimeoutError
as
e
:
self
.
app
.
logger
.
error
(
f
"Error de Timeout. Error: {e}"
)
self
.
app
.
logger
.
error
(
f
"Error de Timeout. Error: {e}"
)
status
,
status_description
=
CodeResponseEnum
.
TIMEOUT
,
str
(
e
)
status
,
status_description
=
CodeResponseEnum
.
TIMEOUT
,
str
(
e
)
...
...
deploy/Dockerfile
View file @
c2d602b8
...
@@ -9,8 +9,6 @@ tar xzf Python-3.10.0.tgz && cd Python-3.10.0 && \
...
@@ -9,8 +9,6 @@ tar xzf Python-3.10.0.tgz && cd Python-3.10.0 && \
./configure
--enable-optimizations
&&
\
./configure
--enable-optimizations
&&
\
make altinstall
make altinstall
COPY
subset_sum_linux /tmp/
COPY
requirements.txt /
RUN
python3
-m
pip
install
numpy pandas py4j python-dateutil pytz six tzdata
RUN
python3
-m
pip
install
numpy pandas py4j python-dateutil pytz six tzdata
...
...
jars/mysql-connector-java-8.0.30.jar
deleted
100644 → 0
View file @
7086b4fa
File deleted
scripts/emr_match-and-exclude-records-actions_v1.py
deleted
100644 → 0
View file @
7086b4fa
import
os
import
uuid
from
typing
import
Dict
,
Any
,
List
import
sys
import
subprocess
import
pandas
as
pd
import
time
from
datetime
import
datetime
from
dateutil.parser
import
parse
import
json
import
pytz
import
logging
from
enum
import
Enum
from
pyspark.sql
import
SparkSession
from
pyspark.sql.functions
import
sum
,
collect_list
,
round
,
when
,
col
,
lit
,
size
,
udf
,
array_except
,
array
from
pyspark.sql.types
import
ArrayType
,
IntegerType
,
LongType
class
FixedFieldsEnum
(
Enum
):
INTER_PIVOT_ID
=
"INTER_PIVOT_ID"
INTER_CTP_ID
=
"INTER_CTP_ID"
LISTA_DIFF
=
"LISTA_DIFF"
DIFF
=
"DIFF"
MATCH_RECORDS
=
"match-records"
class
StatusEnum
(
Enum
):
OK
=
200
ERROR
=
609
TIMEOUT
=
610
logger
=
logging
.
getLogger
(
__name__
)
# EXCLUDE VALIDATION FIELD
EXCLUDE_ROWS_FIELD
=
"EXCLUDE_VALID"
# REDONDEO DE DECIMALES
ROUND_DECIMAL
=
2
# COLUMNAS TABLA RESULTADO
RESULT_TABLENAME
=
"CSS_RESULT_BY_ACTION"
RESULT_TABLE_FIELDS
=
[
"ACTION_ID"
,
"ID_PROCESS"
,
"CREATE_DATE"
,
"KEY"
,
"RESULT_JSON"
]
def
process
()
->
Dict
[
str
,
Any
]:
response
=
{
"status"
:
StatusEnum
.
ERROR
.
name
}
start_time
=
time
.
time
()
params
=
sys
.
argv
descriptor
=
params
[
1
]
jdbc_url
=
params
[
2
]
timezone
=
params
[
3
]
pattern
=
params
[
4
]
descriptor
=
json
.
loads
(
descriptor
)
session
=
createSession
()
configs
=
descriptor
[
"config-params"
]
exclude_pivot
=
configs
[
"exclude-entity-pivot"
]
max_combinations
=
configs
[
"max-records-per-combinations"
]
params_input
=
descriptor
[
"params-input"
]
pivot_params
,
ctp_params
=
params_input
[
"pivot-config"
],
params_input
[
"counterpart-config"
]
pivot_table
,
ctp_table
=
pivot_params
[
"tablename"
],
ctp_params
[
"tablename"
]
jdbc_properties
=
{
"driver"
:
"com.mysql.jdbc.Driver"
}
pivot_df
=
session
.
read
.
jdbc
(
url
=
jdbc_url
,
table
=
pivot_table
,
properties
=
jdbc_properties
)
ctp_df
=
session
.
read
.
jdbc
(
url
=
jdbc_url
,
table
=
ctp_table
,
properties
=
jdbc_properties
)
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
for
column
in
pivot_df
.
columns
:
if
column
==
EXCLUDE_ROWS_FIELD
:
continue
pivot_df
=
pivot_df
.
withColumnRenamed
(
column
,
"PIVOT_"
+
column
)
for
column
in
ctp_df
.
columns
:
if
column
==
EXCLUDE_ROWS_FIELD
:
continue
ctp_df
=
ctp_df
.
withColumnRenamed
(
column
,
"COUNTERPART_"
+
column
)
for
key_p
,
key_c
in
zip
(
pivot_params
.
keys
(),
ctp_params
.
keys
()):
if
isinstance
(
pivot_params
[
key_p
],
str
):
pivot_params
[
key_p
]
=
"PIVOT_"
+
pivot_params
[
key_p
]
ctp_params
[
key_c
]
=
"COUNTERPART_"
+
ctp_params
[
key_c
]
else
:
pivot_params
[
key_p
]
=
[
"PIVOT_"
+
column
for
column
in
pivot_params
[
key_p
]]
ctp_params
[
key_c
]
=
[
"COUNTERPART_"
+
column
for
column
in
ctp_params
[
key_c
]]
pivot_cols
=
pivot_params
[
"columns-transaction"
]
.
copy
()
if
pivot_params
[
"amount-column"
]
in
pivot_cols
:
pivot_cols
.
remove
(
pivot_params
[
"amount-column"
])
ctp_cols
=
ctp_params
[
"columns-transaction"
]
.
copy
()
if
ctp_params
[
"amount-column"
]
in
ctp_cols
:
ctp_cols
.
remove
(
ctp_params
[
"amount-column"
])
# Ejecutamos lógica de excluir registros
if
len
(
pivot_params
[
"columns-group"
])
==
0
and
len
(
ctp_params
[
"columns-group"
])
==
0
:
raise
RuntimeError
(
f
"Debe haber al menos pivot o contraparte agrupado"
)
# Caso: 1 - Muchos
elif
len
(
pivot_params
[
"columns-group"
])
==
0
and
len
(
ctp_params
[
"columns-group"
])
>
0
:
ctp_df2
=
ctp_df
.
groupby
(
ctp_params
[
"columns-group"
])
.
\
agg
(
round
(
sum
(
ctp_params
[
"amount-column"
]),
ROUND_DECIMAL
)
.
alias
(
ctp_params
[
"amount-column"
]),
collect_list
(
ctp_params
[
"id-column"
])
.
alias
(
ctp_params
[
"id-column"
]))
pivot_df2
=
pivot_df
# Caso: Muchos - 1
elif
len
(
pivot_params
[
"columns-group"
])
>
0
and
len
(
ctp_params
[
"columns-group"
])
==
0
:
pivot_df2
=
pivot_df
.
groupby
(
pivot_params
[
"columns-group"
])
.
agg
(
round
(
sum
(
pivot_params
[
"amount-column"
]),
ROUND_DECIMAL
)
.
alias
(
pivot_params
[
"amount-column"
]),
collect_list
(
pivot_params
[
"id-column"
])
.
alias
(
pivot_params
[
"id-column"
]))
ctp_df2
=
ctp_df
# Caso: Muchos - Muchos
elif
len
(
pivot_params
[
"columns-group"
])
>
0
and
len
(
ctp_params
[
"columns-group"
])
>
0
:
pivot_df2
=
pivot_df
.
groupby
(
pivot_params
[
"columns-group"
])
.
agg
(
round
(
sum
(
pivot_params
[
"amount-column"
]),
ROUND_DECIMAL
)
.
alias
(
pivot_params
[
"amount-column"
]),
collect_list
(
pivot_params
[
"id-column"
])
.
alias
(
pivot_params
[
"id-column"
]))
ctp_df2
=
ctp_df
.
groupby
(
ctp_params
[
"columns-group"
])
.
agg
(
round
(
sum
(
ctp_params
[
"amount-column"
]),
ROUND_DECIMAL
)
.
alias
(
ctp_params
[
"amount-column"
]),
collect_list
(
ctp_params
[
"id-column"
])
.
alias
(
ctp_params
[
"id-column"
]))
condition
=
[
pivot_df2
[
col1
]
==
ctp_df2
[
col2
]
for
col1
,
col2
in
zip
(
pivot_params
[
"columns-transaction"
],
ctp_params
[
"columns-transaction"
])]
total_merged
=
pivot_df2
.
join
(
ctp_df2
,
condition
,
'left'
)
total_merged
=
total_merged
.
withColumn
(
"DIFF"
,
when
(
col
(
ctp_params
[
"columns-transaction"
][
0
])
.
isNotNull
(),
lit
(
0
))
.
otherwise
(
lit
(
None
)))
total_merged
=
total_merged
.
select
(
*
pivot_df2
.
columns
,
"DIFF"
)
condition
=
[
total_merged
[
col1
]
==
ctp_df2
[
col2
]
for
col1
,
col2
in
zip
(
pivot_cols
,
ctp_cols
)]
merged
=
total_merged
.
join
(
ctp_df2
,
condition
)
merged
=
merged
.
withColumn
(
"DIFF"
,
when
(
col
(
"DIFF"
)
.
isNull
(),
total_merged
[
pivot_params
[
"amount-column"
]]
-
ctp_df2
[
ctp_params
[
"amount-column"
]])
.
otherwise
(
col
(
"DIFF"
)))
if
len
(
pivot_params
[
"columns-group"
])
==
0
and
len
(
ctp_params
[
"columns-group"
])
>
0
:
merged
=
merged
.
sort
(
pivot_params
[
"id-column"
])
merged
=
merged
.
dropDuplicates
([
pivot_cols
])
elif
len
(
pivot_params
[
"columns-group"
])
>
0
and
len
(
ctp_params
[
"columns-group"
])
==
0
:
merged
=
merged
.
sort
(
ctp_params
[
"id-column"
])
merged
=
merged
.
dropDuplicates
([
ctp_cols
])
merged_df
=
merged
.
withColumn
(
"DIFF"
,
round
(
merged
[
"DIFF"
],
ROUND_DECIMAL
))
if
exclude_pivot
:
df
=
pivot_df
group_cols
=
pivot_params
[
"columns-group"
]
amount_col
=
pivot_params
[
"amount-column"
]
id_col
=
pivot_params
[
"id-column"
]
else
:
df
=
ctp_df
group_cols
=
ctp_params
[
"columns-group"
]
amount_col
=
ctp_params
[
"amount-column"
]
id_col
=
ctp_params
[
"id-column"
]
total_tmp_cols
=
group_cols
+
[
"DIFF"
]
df3
=
df
.
join
(
merged_df
.
select
(
*
total_tmp_cols
),
group_cols
)
columns
=
[
col
(
column
)
for
column
in
group_cols
]
custom
=
udf
(
custom_func_udf
,
ArrayType
(
IntegerType
()))
# Fitlrar solo los que tienen S en el campo de exclusión - No tomaria los matches
# df3 = df3.filter(col(EXCLUDE_ROWS_FIELD) == 'S')
resultado
=
df3
.
groupby
(
*
columns
)
.
agg
(
custom
(
collect_list
(
amount_col
),
collect_list
(
id_col
),
collect_list
(
EXCLUDE_ROWS_FIELD
),
collect_list
(
"DIFF"
),
lit
(
max_combinations
))
.
alias
(
"LISTA_DIFF"
))
meged2
=
resultado
.
join
(
merged_df
,
group_cols
,
'left'
)
handle_array_udf
=
udf
(
handle_array
,
ArrayType
(
IntegerType
()))
meged2
=
meged2
.
withColumn
(
"LISTA_DIFF"
,
handle_array_udf
(
"LISTA_DIFF"
))
meged2
=
meged2
.
filter
((
col
(
"DIFF"
)
==
0
)
|
((
col
(
"DIFF"
)
!=
0
)
&
(
size
(
col
(
"LISTA_DIFF"
))
>
0
)))
if
exclude_pivot
:
meged2
=
meged2
.
withColumn
(
"INTER_PIVOT_ID"
,
array_except
(
meged2
[
pivot_params
[
"id-column"
]],
meged2
[
"LISTA_DIFF"
]))
meged2
=
meged2
.
withColumnRenamed
(
ctp_params
[
"id-column"
],
"INTER_CTP_ID"
)
if
meged2
.
schema
[
"INTER_CTP_ID"
]
.
dataType
==
LongType
():
meged2
=
meged2
.
withColumn
(
"INTER_CTP_ID"
,
array
(
col
(
"INTER_CTP_ID"
))
.
cast
(
ArrayType
(
LongType
())))
else
:
meged2
=
meged2
.
withColumn
(
"INTER_CTP_ID"
,
array_except
(
meged2
[
ctp_params
[
"id-column"
]],
meged2
[
"LISTA_DIFF"
]))
meged2
=
meged2
.
withColumnRenamed
(
pivot_params
[
"id-column"
],
"INTER_PIVOT_ID"
)
if
meged2
.
schema
[
"INTER_PIVOT_ID"
]
.
dataType
==
LongType
():
meged2
=
meged2
.
withColumn
(
"INTER_PIVOT_ID"
,
array
(
col
(
"INTER_PIVOT_ID"
))
.
cast
(
ArrayType
(
LongType
())))
meged2
.
show
()
print
(
"SHOW:"
,
time
.
time
()
-
start_time
)
meged2
=
meged2
.
toPandas
()
print
(
"SOLO ALGORITMO:"
,
time
.
time
()
-
start_time
)
# Guardado en la BD
print
(
"creando result"
)
result
=
create_result
(
meged2
,
descriptor
)
print
(
"emepce a guardar"
)
if
result
[
"status"
]
==
StatusEnum
.
ERROR
:
raise
InterruptedError
(
f
"Error generando el json resultado. {result['message']}"
)
save
=
save_result
(
result
,
session
,
jdbc_url
,
descriptor
,
timezone
,
pattern
)
if
save
[
"status"
]
==
StatusEnum
.
ERROR
:
raise
InterruptedError
(
f
"Error guardando registro resultado en la BD. {result['message']}"
)
response
[
"status"
]
=
StatusEnum
.
OK
.
name
return
response
def
createSession
(
name
:
str
=
"app_engine_spark"
):
try
:
session
=
SparkSession
.
builder
\
.
appName
(
name
)
\
.
getOrCreate
()
return
session
except
Exception
as
e
:
raise
Exception
(
f
"Error creando sesion Spark. {e}"
)
def
handle_array
(
x
):
if
isinstance
(
x
,
List
):
return
x
else
:
return
[]
def
custom_func_udf
(
amount_values
,
id_values
,
excludes
,
diffs
,
max_combinations
):
diff
=
diffs
[
0
]
if
pd
.
isna
(
diff
)
or
diff
==
0
:
return
None
diff
=
int
(
diff
*
(
10
**
ROUND_DECIMAL
))
amount_values
=
[
int
(
value
*
(
10
**
ROUND_DECIMAL
))
for
value
,
exclude
in
zip
(
amount_values
,
excludes
)
if
exclude
==
'S'
]
dir_name
=
str
(
uuid
.
uuid4
())
prefix
=
"/tmp/"
+
dir_name
+
"_"
tmp_file_arr1
,
tmp_file_arr2
=
"values.txt"
,
"target.txt"
full_path_arr1
,
full_path_arr2
=
prefix
+
tmp_file_arr1
,
prefix
+
tmp_file_arr2
with
open
(
full_path_arr1
,
'w'
)
as
archivo
:
archivo
.
writelines
([
f
'{entero}
\n
'
for
entero
in
amount_values
])
with
open
(
full_path_arr2
,
'w'
)
as
archivo
:
archivo
.
write
(
str
(
diff
))
executable_path
=
'/tmp/subset_sum_linux'
indices
=
[]
for
comb
in
range
(
1
,
max_combinations
+
1
):
argumentos
=
[
full_path_arr1
,
full_path_arr2
,
str
(
comb
),
'1'
,
'1'
,
'false'
,
'false'
]
result
=
subprocess
.
run
([
executable_path
]
+
argumentos
,
check
=
True
,
capture_output
=
True
,
text
=
True
)
result
=
str
(
result
)
if
"keys:["
in
result
:
match
=
result
[
result
.
index
(
"keys:["
)
+
5
:
result
.
index
(
"keys remainder"
)
-
20
]
match
=
match
.
replace
(
"targets:"
,
""
)
.
replace
(
"+"
,
","
)
match
=
match
.
split
(
"=="
)[
0
]
.
replace
(
" "
,
""
)
match
=
json
.
loads
(
match
)
for
idx
,
val
in
zip
(
id_values
,
amount_values
):
if
val
in
match
:
indices
.
append
(
idx
)
match
.
remove
(
val
)
break
os
.
remove
(
full_path_arr1
),
os
.
remove
(
full_path_arr2
)
return
indices
def
create_result
(
data
,
descriptor
):
result
=
[]
response
=
{
"detail"
:
result
}
try
:
exclude_pivot
=
descriptor
[
"config-params"
][
"exclude-entity-pivot"
]
pivot_params
=
descriptor
[
"params-input"
][
"pivot-config"
]
ctp_params
=
descriptor
[
"params-input"
][
"counterpart-config"
]
group_pivot_match
=
pivot_params
[
"columns-group"
]
transaction_pivot_match
=
pivot_params
[
"columns-transaction"
]
group_counterpart_match
=
ctp_params
[
"columns-group"
]
transaction_counterpart_match
=
ctp_params
[
"columns-transaction"
]
used_list
=
transaction_counterpart_match
if
exclude_pivot
else
transaction_pivot_match
if
data
is
None
or
data
.
empty
:
logger
.
info
(
f
"El dataframe resultado esta vacio"
)
else
:
for
idx
,
i
in
data
.
iterrows
():
input_data
=
{}
key_transaction
=
None
key_group_pivot
=
None
key_group_counterpart
=
None
for
element
in
used_list
:
if
key_transaction
is
None
:
key_transaction
=
str
(
i
[
element
])
else
:
key_transaction
=
key_transaction
+
"-"
+
str
(
i
[
element
])
for
element_g
in
group_pivot_match
:
if
key_group_pivot
is
None
:
key_group_pivot
=
str
(
i
[
element_g
])
else
:
key_group_pivot
=
key_group_pivot
+
"-"
+
str
(
i
[
element_g
])
for
element_c
in
group_counterpart_match
:
if
key_group_counterpart
is
None
:
key_group_counterpart
=
str
(
i
[
element_c
])
else
:
key_group_counterpart
=
key_group_counterpart
+
"-"
+
str
(
i
[
element_c
])
input_data
[
"key-transaction"
]
=
str
(
key_transaction
)
input_data
[
"key-group-pivot"
]
=
""
if
key_group_pivot
is
None
else
str
(
key_group_pivot
)
input_data
[
"key-group-counterpart"
]
=
""
if
key_group_counterpart
is
None
else
str
(
key_group_counterpart
)
input_data
[
"list-ids-pivot"
]
=
str
(
i
[
FixedFieldsEnum
.
INTER_PIVOT_ID
.
value
])
input_data
[
"list-ids-counterpart"
]
=
str
(
i
[
FixedFieldsEnum
.
INTER_CTP_ID
.
value
])
input_data
[
"exclude-ids"
]
=
str
(
i
[
FixedFieldsEnum
.
LISTA_DIFF
.
value
])
input_data
[
"difference-amount"
]
=
str
(
i
[
FixedFieldsEnum
.
DIFF
.
value
])
result
.
append
(
input_data
)
response
[
'status'
]
=
StatusEnum
.
OK
response
[
"detail"
]
=
result
except
Exception
as
e
:
logger
.
error
(
f
"Error al crear el diccionario de resultados. {e}"
)
response
[
"status"
]
=
StatusEnum
.
ERROR
response
[
"message"
]
=
str
(
e
)
finally
:
return
response
def
save_result
(
result
,
session
,
jdbc_url
,
descriptor
,
timezone
,
pattern
):
response
=
{}
try
:
d1
=
datetime_by_tzone
(
timezone
,
pattern
)
result_json
=
json
.
dumps
(
result
[
"detail"
])
data
=
[
descriptor
[
"idScript"
],
descriptor
[
"idProcess"
],
d1
,
FixedFieldsEnum
.
MATCH_RECORDS
.
value
,
result_json
]
df
=
pd
.
DataFrame
([
data
],
columns
=
RESULT_TABLE_FIELDS
)
df
=
session
.
createDataFrame
(
df
)
df
.
write
.
format
(
"jdbc"
)
.
option
(
"url"
,
jdbc_url
)
.
option
(
"dbtable"
,
RESULT_TABLENAME
)
.
\
option
(
"driver"
,
"com.mysql.cj.jdbc.Driver"
)
.
mode
(
"append"
)
.
save
()
response
[
'status'
]
=
StatusEnum
.
OK
except
Exception
as
e
:
response
[
"status"
]
=
StatusEnum
.
ERROR
response
[
"message"
]
=
str
(
e
)
logger
.
error
(
f
"Error al guardar registro en la base de datos {e}"
)
finally
:
return
response
def
datetime_by_tzone
(
timezone
,
pattern
):
tzone
=
timezone
offset
=
None
# Algunos casos donde el timezone es de la forma 4:30 y no se encuentra en timezones de pytz (GMT)
if
":"
in
tzone
:
offset
=
tzone
.
split
(
":"
)[
1
]
tzone
=
tzone
.
split
(
":"
)[
0
]
if
"+"
in
tzone
:
tzone
=
tzone
.
replace
(
tzone
[
-
1
],
str
(
int
(
tzone
[
-
1
])
+
1
))
timezones_list
=
pytz
.
all_timezones
tzones
=
[
x
if
tzone
in
x
else
None
for
x
in
timezones_list
]
tzones
=
list
(
filter
(
None
,
tzones
))
server_timezone
=
pytz
.
timezone
(
tzones
[
0
])
logger
.
debug
(
"Zona Horaria : {}"
.
format
(
server_timezone
))
server_time
=
server_timezone
.
localize
(
datetime
.
utcnow
())
current_time
=
parse
(
server_time
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S.
%
f
%
Z'
))
if
offset
:
offset
=
pytz
.
FixedOffset
((
current_time
.
utcoffset
()
.
total_seconds
()
/
60
+
float
(
offset
))
*
-
1
)
offset
=
offset
.
utcoffset
(
datetime
.
utcnow
())
current_time
=
datetime
.
utcnow
()
+
offset
else
:
current_time
=
current_time
.
replace
(
tzinfo
=
None
)
-
current_time
.
utcoffset
()
current_time
=
parse
(
current_time
.
strftime
(
pattern
))
logger
.
debug
(
"Hora actual: {}"
.
format
(
current_time
))
return
current_time
# Ejecución de proceso
process
()
scripts/match-and-exclude-records-actions_v1.py
View file @
c2d602b8
from
typing
import
Any
,
Dict
,
List
import
numpy
as
np
import
pandas
as
pd
import
json
import
json
from
typing
import
Any
,
Dict
import
os
import
sys
import
subprocess
import
uuid
from
dpss
import
find_subset
from
dask
import
dataframe
as
dd
from
numba
import
jit
,
types
,
typed
from
wrapt_timeout_decorator
import
timeout
from
wrapt_timeout_decorator
import
timeout
from
app.main.engine.util.EMRServerless
import
EMRServerless
import
multiprocessing
as
mp
from
app.main.engine.action.ActionInterface
import
ActionInterface
from
app.main.engine.action.ActionInterface
import
ActionInterface
...
@@ -21,7 +28,6 @@ class MatchAndExcludeRecordsAction(ActionInterface):
...
@@ -21,7 +28,6 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def
__init__
(
self
,
app
)
->
None
:
def
__init__
(
self
,
app
)
->
None
:
super
()
.
__init__
(
app
)
super
()
.
__init__
(
app
)
self
.
descriptor
=
None
self
.
max_combinations
=
None
self
.
max_combinations
=
None
self
.
timeout
=
None
self
.
timeout
=
None
self
.
exclude_pivot
=
None
self
.
exclude_pivot
=
None
...
@@ -55,65 +61,176 @@ class MatchAndExcludeRecordsAction(ActionInterface):
...
@@ -55,65 +61,176 @@ class MatchAndExcludeRecordsAction(ActionInterface):
if
param
not
in
pivot_params
.
keys
()
or
param
not
in
ctp_params
.
keys
():
if
param
not
in
pivot_params
.
keys
()
or
param
not
in
ctp_params
.
keys
():
raise
ReferenceError
(
f
"Parámetro *{param}* no encontrado en pivot o contraparte"
)
raise
ReferenceError
(
f
"Parámetro *{param}* no encontrado en pivot o contraparte"
)
self
.
descriptor
=
descriptor
self
.
max_combinations
=
configs
[
"max-records-per-combinations"
]
self
.
max_combinations
=
configs
[
"max-records-per-combinations"
]
self
.
timeout
=
configs
[
"max-timeout-per-combinations"
]
self
.
timeout
=
configs
[
"max-timeout-per-combinations"
]
self
.
exclude_pivot
=
configs
[
"exclude-entity-pivot"
]
self
.
exclude_pivot
=
configs
[
"exclude-entity-pivot"
]
self
.
pivot_params
=
pivot_params
self
.
pivot_params
=
pivot_params
self
.
ctp_params
=
ctp_params
self
.
ctp_params
=
ctp_params
def
process
(
self
,
source_obs
,
script_name
,
timezone
,
pattern
):
def
process
(
self
,
source_obs
):
try
:
try
:
@
timeout
(
self
.
timeout
)
@
timeout
(
self
.
timeout
)
def
__process
(
source_obj
):
def
__process
(
source_obj
):
descriptor
=
DESCRIPTOR
# Traer la data desde BD tanto pivot como contraparte
serverless_job_role_arn
=
descriptor
[
"job_role_arn"
]
pivot_table
,
ctp_table
=
self
.
pivot_params
[
"tablename"
],
self
.
ctp_params
[
"tablename"
]
s3_bucket_name
=
descriptor
[
"bucket_base"
]
dialect
=
source_obj
.
get_dialect
()
search_emr
=
descriptor
[
"emr_started"
]
end_app
=
descriptor
[
"terminated_app"
]
pivot_df
=
dd
.
read_sql_table
(
pivot_table
,
dialect
,
index_col
=
self
.
pivot_params
[
"id-column"
],
emr_serverless
=
EMRServerless
(
search_app
=
search_emr
)
npartitions
=
mp
.
cpu_count
())
pivot_df
=
pivot_df
.
reset_index
()
self
.
app
.
logger
.
info
(
"Validando si exite una aplicación ya en curso"
)
ctp_df
=
dd
.
read_sql_table
(
ctp_table
,
dialect
,
index_col
=
self
.
ctp_params
[
"id-column"
],
exists_app
=
emr_serverless
.
valid_application
()
npartitions
=
mp
.
cpu_count
())
if
not
exists_app
[
"exists"
]:
ctp_df
=
ctp_df
.
reset_index
()
self
.
app
.
logger
.
info
(
"Creando e inicializando la aplicación EMR Serverless"
)
emr_serverless
.
create_application
(
descriptor
[
"application_name"
],
descriptor
[
"emr_version"
],
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
descriptor
[
"app_args"
])
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
# Iterar sobre las columnas del DataFrame
for
column
in
pivot_df
.
columns
:
if
column
==
EXCLUDE_ROWS_FIELD
:
continue
new_column_name
=
"PIVOT_"
+
column
pivot_df
=
pivot_df
.
rename
(
columns
=
{
column
:
new_column_name
})
for
column
in
ctp_df
.
columns
:
if
column
==
EXCLUDE_ROWS_FIELD
:
continue
new_column_name
=
"COUNTERPART_"
+
column
ctp_df
=
ctp_df
.
rename
(
columns
=
{
column
:
new_column_name
})
for
key_p
,
key_c
in
zip
(
self
.
pivot_params
.
keys
(),
self
.
ctp_params
.
keys
()):
if
isinstance
(
self
.
pivot_params
[
key_p
],
str
):
self
.
pivot_params
[
key_p
]
=
"PIVOT_"
+
self
.
pivot_params
[
key_p
]
self
.
ctp_params
[
key_c
]
=
"COUNTERPART_"
+
self
.
ctp_params
[
key_c
]
else
:
self
.
pivot_params
[
key_p
]
=
[
"PIVOT_"
+
column
for
column
in
self
.
pivot_params
[
key_p
]]
self
.
ctp_params
[
key_c
]
=
[
"COUNTERPART_"
+
column
for
column
in
self
.
ctp_params
[
key_c
]]
pivot_cols
=
self
.
pivot_params
[
"columns-transaction"
]
.
copy
()
if
self
.
pivot_params
[
"amount-column"
]
in
pivot_cols
:
pivot_cols
.
remove
(
self
.
pivot_params
[
"amount-column"
])
ctp_cols
=
self
.
ctp_params
[
"columns-transaction"
]
.
copy
()
if
self
.
ctp_params
[
"amount-column"
]
in
ctp_cols
:
ctp_cols
.
remove
(
self
.
ctp_params
[
"amount-column"
])
max_combinations
=
self
.
max_combinations
# Ejecutamos lógica de excluir registros
if
len
(
self
.
pivot_params
[
"columns-group"
])
==
0
and
len
(
self
.
ctp_params
[
"columns-group"
])
==
0
:
raise
RuntimeError
(
f
"Debe haber al menos pivot o contraparte agrupado"
)
# Caso: 1 - Muchos
elif
len
(
self
.
pivot_params
[
"columns-group"
])
==
0
and
len
(
self
.
ctp_params
[
"columns-group"
])
>
0
:
ctp_df2
=
ctp_df
.
groupby
(
self
.
ctp_params
[
"columns-group"
])
.
agg
({
self
.
ctp_params
[
"amount-column"
]:
'sum'
,
# Sumar la columna de cantidades
self
.
ctp_params
[
"id-column"
]:
list
})
ctp_df2
=
ctp_df2
.
reset_index
()
pivot_df2
=
pivot_df
# Caso: Muchos - 1
elif
len
(
self
.
pivot_params
[
"columns-group"
])
>
0
and
len
(
self
.
ctp_params
[
"columns-group"
])
==
0
:
pivot_df2
=
pivot_df
.
groupby
(
self
.
pivot_params
[
"columns-group"
])
.
agg
({
self
.
pivot_params
[
"amount-column"
]:
'sum'
,
self
.
pivot_params
[
"id-column"
]:
list
})
pivot_df2
=
pivot_df2
.
reset_index
()
ctp_df2
=
ctp_df
# Caso: Muchos - Muchos
elif
len
(
self
.
pivot_params
[
"columns-group"
])
>
0
and
len
(
self
.
ctp_params
[
"columns-group"
])
>
0
:
pivot_df2
=
pivot_df
.
groupby
(
self
.
pivot_params
[
"columns-group"
])
.
agg
({
self
.
pivot_params
[
"amount-column"
]:
'sum'
,
self
.
pivot_params
[
"id-column"
]:
list
})
pivot_df2
=
pivot_df2
.
reset_index
()
ctp_df2
=
ctp_df
.
groupby
(
self
.
ctp_params
[
"columns-group"
])
.
agg
({
self
.
ctp_params
[
"amount-column"
]:
'sum'
,
# Sumar la columna de cantidades
self
.
ctp_params
[
"id-column"
]:
list
})
ctp_df2
=
ctp_df2
.
reset_index
()
pivot_df2
[
self
.
pivot_params
[
"amount-column"
]]
=
pivot_df2
[
self
.
pivot_params
[
"amount-column"
]]
.
round
(
ROUND_DECIMAL
)
ctp_df2
[
self
.
ctp_params
[
"amount-column"
]]
=
ctp_df2
[
self
.
ctp_params
[
"amount-column"
]]
.
round
(
ROUND_DECIMAL
)
total_merged
=
pivot_df2
.
merge
(
ctp_df2
,
'left'
,
left_on
=
self
.
pivot_params
[
"columns-transaction"
],
right_on
=
self
.
ctp_params
[
"columns-transaction"
])
total_merged
=
total_merged
.
map_partitions
(
self
.
add_diff_column
)
selected_columns
=
list
(
pivot_df2
.
columns
)
+
[
'DIFF'
]
total_merged
=
total_merged
[
selected_columns
]
merged
=
total_merged
.
merge
(
ctp_df2
,
'inner'
,
left_on
=
pivot_cols
,
right_on
=
ctp_cols
)
merged
[
'DIFF'
]
=
merged
[
'DIFF'
]
.
where
(
merged
[
'DIFF'
]
.
notnull
(),
merged
[
self
.
pivot_params
[
"amount-column"
]]
-
merged
[
self
.
ctp_params
[
"amount-column"
]])
if
len
(
self
.
pivot_params
[
"columns-group"
])
==
0
and
len
(
self
.
ctp_params
[
"columns-group"
])
>
0
:
merged
=
merged
.
set_index
(
self
.
pivot_params
[
"id-column"
])
merged
=
merged
.
map_partitions
(
lambda
df_
:
df_
.
sort_values
([
self
.
pivot_params
[
"id-column"
]]))
merged
=
merged
.
drop_duplicates
(
subset
=
pivot_cols
)
elif
len
(
self
.
pivot_params
[
"columns-group"
])
>
0
and
len
(
self
.
ctp_params
[
"columns-group"
])
==
0
:
merged
=
merged
.
set_index
(
self
.
ctp_params
[
"id-column"
])
merged
=
merged
.
map_partitions
(
lambda
df_
:
df_
.
sort_values
([
self
.
ctp_params
[
"id-column"
]]))
merged
=
merged
.
drop_duplicates
(
subset
=
ctp_cols
)
merged
=
merged
.
reset_index
()
merged_df
=
merged
.
assign
(
DIFF
=
lambda
partition
:
partition
[
"DIFF"
]
.
round
(
ROUND_DECIMAL
))
if
self
.
exclude_pivot
:
df
=
pivot_df
group_cols
=
self
.
pivot_params
[
"columns-group"
]
amount_col
=
self
.
pivot_params
[
"amount-column"
]
id_col
=
self
.
pivot_params
[
"id-column"
]
else
:
else
:
emr_serverless
.
application_id
=
exists_app
[
"app"
]
df
=
ctp_df
emr_serverless
.
start_application
()
group_cols
=
self
.
ctp_params
[
"columns-group"
]
self
.
app
.
logger
.
info
(
emr_serverless
)
amount_col
=
self
.
ctp_params
[
"amount-column"
]
job
=
descriptor
[
"job"
]
id_col
=
self
.
ctp_params
[
"id-column"
]
script_location
=
job
[
"script_location"
]
+
"emr_"
+
script_name
jdbc_conn
=
source_obj
.
create_spark_connection
()
total_tmp_cols
=
group_cols
+
[
"DIFF"
]
# jdbc_url = jdbc_conn["url"]
jdbc_url
=
"jdbc:mysql://admin:awsadmin@database-2.cgcfmoce13qq.us-east-1.rds.amazonaws.com:3306/cusca"
df3
=
df
.
merge
(
merged_df
[
total_tmp_cols
],
'inner'
,
on
=
group_cols
)
# jdbc_properties = jdbc_conn["properties"]
df3
=
df3
.
compute
()
arguments
=
[
json
.
dumps
(
self
.
descriptor
),
jdbc_url
,
timezone
,
pattern
]
total_cols
=
group_cols
+
[
amount_col
,
id_col
,
EXCLUDE_ROWS_FIELD
,
"DIFF"
]
self
.
app
.
logger
.
info
(
"Lanzando nuevo job Spark"
)
resultado
=
df3
.
groupby
(
group_cols
)[
total_cols
]
.
apply
(
lambda
x
:
custom_func
(
x
,
amount_col
,
id_col
,
max_combinations
))
self
.
app
.
logger
.
info
(
script_location
)
self
.
app
.
logger
.
info
(
serverless_job_role_arn
)
resultado
=
resultado
.
reset_index
()
self
.
app
.
logger
.
info
(
arguments
)
if
len
(
resultado
.
columns
)
==
1
:
self
.
app
.
logger
.
info
(
job
[
"sparkArgs"
])
resultado
=
pd
.
DataFrame
([],
columns
=
group_cols
+
[
"LISTA_DIFF"
])
self
.
app
.
logger
.
info
(
s3_bucket_name
)
else
:
job_run_id
=
emr_serverless
.
run_spark_job
(
resultado
.
columns
=
group_cols
+
[
"LISTA_DIFF"
]
script_location
=
script_location
,
resultado
=
dd
.
from_pandas
(
resultado
,
npartitions
=
mp
.
cpu_count
())
job_role_arn
=
serverless_job_role_arn
,
arguments
=
arguments
,
meged2
=
resultado
.
merge
(
merged_df
,
'left'
,
group_cols
)
sparkArguments
=
job
[
"sparkArgs"
],
s3_bucket_name
=
s3_bucket_name
,
meged2
=
meged2
.
map_partitions
(
lambda
partition
:
partition
.
assign
(
)
LISTA_DIFF
=
partition
[
'LISTA_DIFF'
]
.
apply
(
lambda
x
:
[]
if
pd
.
isna
(
x
)
else
x
)),
meta
=
meged2
.
dtypes
.
to_dict
())
job_status
=
emr_serverless
.
get_job_run
(
job_run_id
)
self
.
app
.
logger
.
info
(
f
"Job terminado: {job_run_id}, Estado: {job_status.get('state')}"
)
meged2
=
meged2
[
# Fetch and print the logs
(
meged2
[
'DIFF'
]
==
0
)
|
spark_driver_logs
=
emr_serverless
.
fetch_driver_log
(
s3_bucket_name
,
job_run_id
)
((
meged2
[
'DIFF'
]
!=
0
)
&
meged2
[
'LISTA_DIFF'
]
.
apply
(
self
.
app
.
logger
.
info
(
"Archivo de salida:
\n
----
\n
"
,
spark_driver_logs
,
"
\n
----"
)
lambda
x
:
True
if
not
pd
.
isna
(
x
)
and
((
isinstance
(
x
,
List
)
and
len
(
x
)
>
0
)
or
(
isinstance
(
x
,
str
)
and
len
(
x
)
>
2
))
else
False
))
if
end_app
:
]
# Now stop and delete your application
meged2
=
meged2
.
compute
()
self
.
app
.
logger
.
info
(
"Deteniendo y borrando aplicación Spark"
)
emr_serverless
.
stop_application
()
if
meged2
.
empty
:
emr_serverless
.
delete_application
()
pass
self
.
app
.
logger
.
info
(
"Hecho! 👋"
)
elif
self
.
exclude_pivot
:
meged2
[
'INTER_PIVOT_ID'
]
=
meged2
.
apply
(
lambda
row
:
self
.
array_except
(
row
[
self
.
pivot_params
[
"id-column"
]],
row
[
'LISTA_DIFF'
]),
axis
=
1
)
meged2
=
meged2
.
rename
(
columns
=
{
self
.
ctp_params
[
"id-column"
]:
"INTER_CTP_ID"
})
if
meged2
[
'INTER_CTP_ID'
]
.
dtype
==
'int64'
:
meged2
[
'INTER_CTP_ID'
]
=
meged2
[
'INTER_CTP_ID'
]
.
apply
(
lambda
x
:
[
x
])
.
astype
(
'object'
)
else
:
meged2
[
'INTER_CTP_ID'
]
=
meged2
.
apply
(
lambda
row
:
self
.
array_except
(
row
[
self
.
ctp_params
[
"id-column"
]],
row
[
'LISTA_DIFF'
]),
axis
=
1
)
meged2
=
meged2
.
rename
(
columns
=
{
self
.
pivot_params
[
"id-column"
]:
"INTER_PIVOT_ID"
})
if
meged2
[
'INTER_PIVOT_ID'
]
.
dtype
==
'int64'
:
meged2
[
'INTER_PIVOT_ID'
]
=
meged2
[
'INTER_PIVOT_ID'
]
.
apply
(
lambda
x
:
[
x
])
.
astype
(
'object'
)
return
meged2
except
TimeoutError
as
e
:
except
TimeoutError
as
e
:
raise
TimeoutError
(
f
"Tiempo límite superado. {e}"
)
raise
TimeoutError
(
f
"Tiempo límite superado. {e}"
)
...
@@ -123,57 +240,56 @@ class MatchAndExcludeRecordsAction(ActionInterface):
...
@@ -123,57 +240,56 @@ class MatchAndExcludeRecordsAction(ActionInterface):
def
response
(
self
):
def
response
(
self
):
return
self
.
output
return
self
.
output
def
add_diff_column
(
self
,
partition
):
partition
[
'DIFF'
]
=
np
.
where
(
partition
[
self
.
ctp_params
[
"columns-transaction"
][
0
]]
.
notnull
(),
0
,
np
.
nan
)
return
partition
def
handle_array
(
self
,
x
):
if
isinstance
(
x
,
np
.
ndarray
):
return
x
else
:
return
[]
def
array_except
(
self
,
arr1
,
arr2
):
if
arr2
is
None
:
return
arr1
elif
not
isinstance
(
arr2
,
List
):
cadena_sin_corchetes
=
arr2
.
replace
(
" "
,
""
)
.
strip
(
'[]'
)
partes
=
cadena_sin_corchetes
.
split
(
","
)
# print(partes)
arr2
=
[
int
(
numero
)
for
numero
in
partes
]
arr1
=
json
.
loads
(
arr1
.
replace
(
" "
,
""
))
return
[
item
for
item
in
arr1
if
item
not
in
arr2
]
def
custom_func
(
group
,
amount_field
,
id_field
,
max_combinations
):
diff_value
=
group
[
"DIFF"
]
.
values
[
0
]
if
np
.
isnan
(
diff_value
):
return
None
diff
=
int
(
diff_value
*
(
10
**
ROUND_DECIMAL
))
if
pd
.
isna
(
diff
)
or
diff
==
0
:
return
None
group
=
group
[
group
[
EXCLUDE_ROWS_FIELD
]
==
'S'
]
group
[
amount_field
]
=
group
[
amount_field
]
.
astype
(
float
)
group
=
group
.
reset_index
(
drop
=
True
)
values
=
group
[
amount_field
]
.
values
values
*=
(
10
**
ROUND_DECIMAL
)
values
=
values
.
astype
(
np
.
int64
)
ids
=
group
[
id_field
]
.
values
tam
=
len
(
values
)
tam
=
tam
if
tam
<=
max_combinations
else
max_combinations
result
=
find_subset
(
values
,
diff
,
tam
)
indices
=
[]
if
len
(
result
)
>
0
:
result
=
result
[
0
]
for
idx
,
val
in
zip
(
ids
,
values
):
if
val
in
result
:
indices
.
append
(
idx
)
result
.
remove
(
val
)
else
:
return
None
return
indices
DESCRIPTOR
=
{
"application_name"
:
"css_cuscatlan"
,
"emr_version"
:
"emr-7.0.0"
,
"emr_started"
:
True
,
"terminated_app"
:
False
,
"job_role_arn"
:
"arn:aws:iam::000026703603:role/emr-serverless-job-role"
,
"bucket_base"
:
"bucket-emr-example"
,
"app_args"
:
{
"initial_capacity"
:
{
"DRIVER"
:
{
"workerCount"
:
1
,
"workerConfiguration"
:
{
"cpu"
:
"16vCPU"
,
"memory"
:
"32GB"
}
},
"EXECUTOR"
:
{
"workerCount"
:
12
,
"workerConfiguration"
:
{
"cpu"
:
"16vCPU"
,
"memory"
:
"32GB"
}
}
},
"maximun_capacity"
:
{
"cpu"
:
"208vCPU"
,
"memory"
:
"416GB"
,
"disk"
:
"1000GB"
},
"imageConfiguration"
:
{
"imageUri"
:
"000026703603.dkr.ecr.us-east-1.amazonaws.com/css_spark_custom:0.0.5"
},
"networkConfiguration"
:
{
"subnetIds"
:
[
"subnet-0f86499848ec99861"
,
"subnet-078fe716da8b53818"
,
"subnet-0a7d0a8bc3b623474"
],
"securityGroupIds"
:
[
"sg-02154713a3639f7ce"
]
}
},
"job"
:
{
"script_location"
:
"s3://bucket-emr-example/css_cusca/endpoint/"
,
"sparkArgs"
:
{
"driver-cores"
:
16
,
"driver-memory"
:
"14g"
,
"executor-cores"
:
16
,
"executor-memory"
:
"14g"
,
"executor-instances"
:
12
,
"others"
:
"--jars s3://bucket-emr-example/bcom_spark/jars/mysql-connector-java-8.0.30.jar"
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment