Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
7772e148
Commit
7772e148
authored
Aug 01, 2023
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'developer-SP1' into 'developer'
Fix bugs - V1 See merge request
!3
parents
bbfda096
e89bb096
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
143 additions
and
53 deletions
+143
-53
Database.py
dags/components/Databases/Database.py
+3
-2
Mysql.py
dags/components/Databases/Mysql.py
+26
-5
Oracle.py
dags/components/Databases/Oracle.py
+9
-4
Postgres.py
dags/components/Databases/Postgres.py
+27
-5
Extractor.py
dags/components/Extractor.py
+36
-12
Generation.py
dags/components/Generation.py
+5
-1
InsumoModel.py
dags/components/Model/InsumoModel.py
+8
-0
Utils.py
dags/components/Utils.py
+20
-14
procedure_definition.json
dags/procedure_definition.json
+9
-10
No files found.
dags/components/Databases/Database.py
View file @
7772e148
...
@@ -55,10 +55,11 @@ class Database:
...
@@ -55,10 +55,11 @@ class Database:
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error creando db engine. {e}"
)
logger
.
error
(
f
"Error creando db engine. {e}"
)
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
modelName
:
str
=
"TableModel"
)
->
bool
:
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
db_target
:
str
,
modelName
:
str
=
"TableModel"
)
->
bool
:
create
=
False
create
=
False
try
:
try
:
create
=
self
.
factory
.
create_model
(
tablename
,
fields
,
modelName
)
create
=
self
.
factory
.
create_model
(
tablename
,
fields
,
db_target
,
modelName
)
except
Exception
as
e
:
except
Exception
as
e
:
raise
AssertionError
(
f
"Error creando tabla dinámica con nombre {tablename}. {e}"
)
raise
AssertionError
(
f
"Error creando tabla dinámica con nombre {tablename}. {e}"
)
finally
:
finally
:
...
...
dags/components/Databases/Mysql.py
View file @
7772e148
from
typing
import
List
,
Tuple
from
typing
import
List
,
Tuple
import
pymysql
from
sqlalchemy
import
create_engine
from
sqlalchemy
import
create_engine
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
components.Model.InsumoModel
import
InsumoModel
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.Model.InsumoModel
import
InsumoModel
,
InsumoModelOracle
from
components.Databases.Enums.MysqlDataTypeEnum
import
MysqlDataTypeEnum
from
components.Databases.Enums.MysqlDataTypeEnum
import
MysqlDataTypeEnum
from
components.Databases.Enums.MysqlDataTypeORMEnum
import
MysqlDataTypeORMEnum
from
components.Databases.Enums.MysqlDataTypeORMEnum
import
MysqlDataTypeORMEnum
...
@@ -22,6 +24,17 @@ class Mysql:
...
@@ -22,6 +24,17 @@ class Mysql:
self
.
password
=
password
self
.
password
=
password
self
.
database
=
database
self
.
database
=
database
self
.
engine
=
None
self
.
engine
=
None
self
.
connection
=
None
def
get_basic_connection
(
self
):
try
:
if
isinstance
(
self
.
connection
,
type
(
None
)):
self
.
connection
=
pymysql
.
connect
(
host
=
self
.
host
,
port
=
self
.
port
,
user
=
self
.
user
,
password
=
self
.
password
,
db
=
self
.
database
)
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo conexion básica de Oracle. {e}"
)
finally
:
return
self
.
connection
def
create_engine
(
self
)
->
None
:
def
create_engine
(
self
)
->
None
:
try
:
try
:
...
@@ -31,13 +44,17 @@ class Mysql:
...
@@ -31,13 +44,17 @@ class Mysql:
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error creando engine de Mysql. {e}"
)
logger
.
error
(
f
"Error creando engine de Mysql. {e}"
)
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
modelName
:
str
=
"TableModel"
)
->
bool
:
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
db_target
:
str
,
modelName
:
str
=
"TableModel"
)
:
model
=
None
model
=
None
try
:
try
:
model
=
type
(
modelName
,
(
InsumoModel
,),
{
db_args
=
{
'__tablename__'
:
tablename
,
'__tablename__'
:
tablename
,
'__table_args__'
:
{
'extend_existing'
:
True
}
'__table_args__'
:
{
'extend_existing'
:
True
}
})
}
if
db_target
==
DatabaseTypeEnum
.
ORACLE
.
value
:
model
=
type
(
modelName
,
(
InsumoModelOracle
,),
db_args
)
else
:
model
=
type
(
modelName
,
(
InsumoModel
,),
db_args
)
for
field
in
fields
:
for
field
in
fields
:
logger
.
debug
(
f
"Attribute: {field}"
)
logger
.
debug
(
f
"Attribute: {field}"
)
name
=
field
[
0
]
name
=
field
[
0
]
...
@@ -45,7 +62,11 @@ class Mysql:
...
@@ -45,7 +62,11 @@ class Mysql:
size
=
int
(
field
[
2
]
/
4
)
size
=
int
(
field
[
2
]
/
4
)
try
:
try
:
if
not
isinstance
(
field
[
3
],
type
(
None
))
and
field
[
3
]
>
0
:
if
not
isinstance
(
field
[
3
],
type
(
None
))
and
field
[
3
]
>
0
:
data_type
=
MysqlDataTypeORMEnum
[
MysqlDataTypeEnum
(
field
[
1
])
.
name
]
.
value
(
precision
=
field
[
2
],
scale
=
field
[
3
])
precision
,
scale
=
field
[
2
],
field
[
3
]
if
scale
>
precision
:
precision
=
field
[
3
]
scale
=
field
[
2
]
data_type
=
MysqlDataTypeORMEnum
[
MysqlDataTypeEnum
(
field
[
1
])
.
name
]
.
value
(
precision
=
precision
,
scale
=
scale
)
else
:
else
:
data_type
=
MysqlDataTypeORMEnum
[
MysqlDataTypeEnum
(
field
[
1
])
.
name
]
.
value
(
size
)
data_type
=
MysqlDataTypeORMEnum
[
MysqlDataTypeEnum
(
field
[
1
])
.
name
]
.
value
(
size
)
except
TypeError
:
except
TypeError
:
...
...
dags/components/Databases/Oracle.py
View file @
7772e148
...
@@ -4,9 +4,10 @@ from sqlalchemy import create_engine
...
@@ -4,9 +4,10 @@ from sqlalchemy import create_engine
import
oracledb
import
oracledb
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.Databases.Enums.OracleDataTypeEnum
import
OracleDataTypeEnum
from
components.Databases.Enums.OracleDataTypeEnum
import
OracleDataTypeEnum
from
components.Databases.Enums.OracleDataTypeORMEnum
import
OracleDataTypeORMEnum
from
components.Databases.Enums.OracleDataTypeORMEnum
import
OracleDataTypeORMEnum
from
components.Model.InsumoModel
import
InsumoModel
from
components.Model.InsumoModel
import
InsumoModel
,
InsumoModelOracle
from
sqlalchemy
import
Column
from
sqlalchemy
import
Column
import
logging
import
logging
...
@@ -43,13 +44,17 @@ class Oracle:
...
@@ -43,13 +44,17 @@ class Oracle:
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error creando engine de Oracle. {e}"
)
logger
.
error
(
f
"Error creando engine de Oracle. {e}"
)
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
modelName
:
str
=
"TableModel"
)
->
bool
:
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
db_target
:
str
,
modelName
:
str
=
"TableModel"
)
:
model
=
None
model
=
None
try
:
try
:
model
=
type
(
modelName
,
(
InsumoModel
,),
{
db_args
=
{
'__tablename__'
:
tablename
,
'__tablename__'
:
tablename
,
'__table_args__'
:
{
'extend_existing'
:
True
}
'__table_args__'
:
{
'extend_existing'
:
True
}
})
}
if
db_target
==
DatabaseTypeEnum
.
ORACLE
.
value
:
model
=
type
(
modelName
,
(
InsumoModelOracle
,),
db_args
)
else
:
model
=
type
(
modelName
,
(
InsumoModel
,),
db_args
)
for
field
in
fields
:
for
field
in
fields
:
logger
.
debug
(
f
"Attribute: {field}"
)
logger
.
debug
(
f
"Attribute: {field}"
)
name
=
field
[
0
]
name
=
field
[
0
]
...
...
dags/components/Databases/Postgres.py
View file @
7772e148
from
typing
import
List
,
Tuple
from
typing
import
List
,
Tuple
import
psycopg2
from
sqlalchemy
import
create_engine
from
sqlalchemy
import
create_engine
from
components.Model.InsumoModel
import
InsumoModel
from
components.Model.InsumoModel
import
InsumoModel
,
InsumoModelOracle
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.Databases.Enums.PostgresDataTypeEnum
import
PostgresDataTypeEnum
from
components.Databases.Enums.PostgresDataTypeEnum
import
PostgresDataTypeEnum
from
components.Databases.Enums.PostgresDataTypeORMEnum
import
PostgresDataTypeORMEnum
from
components.Databases.Enums.PostgresDataTypeORMEnum
import
PostgresDataTypeORMEnum
...
@@ -23,6 +25,18 @@ class Postgres:
...
@@ -23,6 +25,18 @@ class Postgres:
self
.
schema
=
schema
self
.
schema
=
schema
self
.
engine
=
None
self
.
engine
=
None
self
.
DEFAULT_VAR_LENGHT
=
100
self
.
DEFAULT_VAR_LENGHT
=
100
self
.
connection
=
None
def
get_basic_connection
(
self
):
try
:
if
isinstance
(
self
.
connection
,
type
(
None
)):
self
.
connection
=
psycopg2
.
connect
(
host
=
self
.
host
,
port
=
self
.
port
,
user
=
self
.
user
,
password
=
self
.
password
,
database
=
self
.
database
,
options
=
"-c search_path="
+
self
.
schema
)
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo conexion básica de Oracle. {e}"
)
finally
:
return
self
.
connection
def
create_engine
(
self
)
->
None
:
def
create_engine
(
self
)
->
None
:
try
:
try
:
...
@@ -33,20 +47,28 @@ class Postgres:
...
@@ -33,20 +47,28 @@ class Postgres:
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error creando engine de Postgres. {e}"
)
logger
.
error
(
f
"Error creando engine de Postgres. {e}"
)
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
modelName
:
str
=
"TableModel"
):
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
db_target
:
str
,
modelName
:
str
=
"TableModel"
):
model
=
None
model
=
None
try
:
try
:
model
=
type
(
modelName
,
(
InsumoModel
,),
{
db_args
=
{
'__tablename__'
:
tablename
,
'__tablename__'
:
tablename
,
'__table_args__'
:
{
'extend_existing'
:
True
}
'__table_args__'
:
{
'extend_existing'
:
True
}
})
}
if
db_target
==
DatabaseTypeEnum
.
ORACLE
.
value
:
model
=
type
(
modelName
,
(
InsumoModelOracle
,),
db_args
)
else
:
model
=
type
(
modelName
,
(
InsumoModel
,),
db_args
)
for
field
in
fields
:
for
field
in
fields
:
logger
.
debug
(
f
"Attribute: {field}"
)
logger
.
debug
(
f
"Attribute: {field}"
)
name
=
field
[
0
]
name
=
field
[
0
]
if
field
[
2
]
!=
-
1
:
if
field
[
2
]
!=
-
1
:
try
:
try
:
if
not
isinstance
(
field
[
3
],
type
(
None
))
and
field
[
3
]
>
0
:
if
not
isinstance
(
field
[
3
],
type
(
None
))
and
field
[
3
]
>
0
:
data_type
=
PostgresDataTypeORMEnum
[
PostgresDataTypeEnum
(
field
[
1
])
.
name
]
.
value
(
precision
=
field
[
2
],
scale
=
field
[
3
])
precision
,
scale
=
field
[
2
],
field
[
3
]
if
scale
>
precision
:
precision
=
field
[
3
]
scale
=
field
[
2
]
data_type
=
PostgresDataTypeORMEnum
[
PostgresDataTypeEnum
(
field
[
1
])
.
name
]
.
value
(
precision
=
precision
,
scale
=
scale
)
else
:
else
:
data_type
=
PostgresDataTypeORMEnum
[
PostgresDataTypeEnum
(
field
[
1
])
.
name
]
.
value
(
field
[
2
])
data_type
=
PostgresDataTypeORMEnum
[
PostgresDataTypeEnum
(
field
[
1
])
.
name
]
.
value
(
field
[
2
])
except
TypeError
:
except
TypeError
:
...
...
dags/components/Extractor.py
View file @
7772e148
from
typing
import
Any
,
Dict
from
typing
import
Any
,
Dict
import
json
import
json
import
numpy
as
np
import
pandas
as
pd
import
pandas
as
pd
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
enums.ProcessStatusEnum
import
ProcessStatusEnum
from
enums.ProcessStatusEnum
import
ProcessStatusEnum
...
@@ -94,9 +95,10 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -94,9 +95,10 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
# Create the model with the procedure descriptor
# Create the model with the procedure descriptor
if
command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|begin"
)
!=
-
1
:
if
command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|begin"
)
!=
-
1
:
tablename
=
command
[:
command
.
find
(
"|"
)]
tablename
=
command
[:
command
.
find
(
"|"
)]
elif
command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|call"
)
!=
-
1
:
tablename
=
command
[:
command
.
find
(
"|"
)]
else
:
else
:
proc_name
=
command
[
len
(
"begin"
):
command
.
rfind
(
"end"
)]
raise
AssertionError
(
"Procedure mal formed"
)
tablename
=
proc_name
.
strip
()
.
replace
(
";"
,
""
)
task
=
kwargs
[
'ti'
]
task
=
kwargs
[
'ti'
]
procedures
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"PROCEDURE-JSON"
)
procedures
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"PROCEDURE-JSON"
)
model
=
None
model
=
None
...
@@ -112,6 +114,9 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -112,6 +114,9 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
else
:
else
:
if
source_conn
.
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
and
\
if
source_conn
.
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
and
\
not
extract_type
.
startswith
(
OperationTypeEnum
.
PROCEDURE
.
value
):
not
extract_type
.
startswith
(
OperationTypeEnum
.
PROCEDURE
.
value
):
values
=
command_for_create
.
split
(
"|"
)
if
len
(
values
)
>
1
:
command_for_create
=
values
[
1
]
command_for_create
=
f
"SELECT * FROM ({command_for_create}) OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY"
command_for_create
=
f
"SELECT * FROM ({command_for_create}) OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY"
else
:
else
:
command_words
=
command_for_create
.
split
(
" "
)
command_words
=
command_for_create
.
split
(
" "
)
...
@@ -121,7 +126,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -121,7 +126,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
with
source_engine
.
connect
()
as
connection
:
with
source_engine
.
connect
()
as
connection
:
final_command
=
command_for_create
final_command
=
command_for_create
if
final_command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|select"
)
!=
-
1
:
if
final_command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|select"
)
!=
-
1
:
final_command
=
final_command
[
final_command
.
find
(
"select"
):]
final_command
=
final_command
[
final_command
.
lower
()
.
find
(
"select"
):]
result
=
connection
.
execute
(
final_command
)
result
=
connection
.
execute
(
final_command
)
fields
=
result
.
cursor
.
description
fields
=
result
.
cursor
.
description
for
field
in
fields
:
for
field
in
fields
:
...
@@ -130,7 +135,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -130,7 +135,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
logger
.
debug
(
f
"Columnas procesadas: {columns}"
)
logger
.
debug
(
f
"Columnas procesadas: {columns}"
)
multiple
=
select_multiple
(
command
)
multiple
=
select_multiple
(
command
)
tablename
=
multiple
[
"tablename"
]
tablename
=
multiple
[
"tablename"
]
model
=
source_conn
.
create_model
(
tablename
,
columns
)
model
=
source_conn
.
create_model
(
tablename
,
columns
,
intern_conn
.
db_type
)
try
:
try
:
create
=
intern_conn
.
create_table
(
model
)
create
=
intern_conn
.
create_table
(
model
)
if
create
:
if
create
:
...
@@ -141,11 +146,31 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -141,11 +146,31 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
if
is_tablename
:
if
is_tablename
:
command
=
command
[
len
(
tablename
+
":"
):]
command
=
command
[
len
(
tablename
+
":"
):]
temp_connection
=
source_conn
.
get_basic_connection
()
temp_connection
=
source_conn
.
get_basic_connection
()
cursor
=
temp_connection
.
cursor
()
if
source_conn
.
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
:
cursor
.
execute
(
command
)
cursor
=
temp_connection
.
cursor
()
for
resultSet
in
cursor
.
getimplicitresults
():
cursor
.
execute
(
command
)
for
resultSet
in
cursor
.
getimplicitresults
():
data
=
[]
for
row
in
resultSet
:
data
.
append
(
row
)
if
len
(
data
)
==
chunksize
:
dataframe
=
pd
.
DataFrame
(
data
,
columns
=
columns_name
)
save
=
save_from_dataframe
(
dataframe
,
tablename
,
intern_conn
.
engine
)
if
save
:
logger
.
debug
(
f
"Guardado correctamente dataframe. Procesando más bloques"
)
data
.
clear
()
if
len
(
data
)
>
0
:
dataframe
=
pd
.
DataFrame
(
data
,
columns
=
columns_name
)
save
=
save_from_dataframe
(
dataframe
,
tablename
,
intern_conn
.
engine
)
if
save
:
logger
.
debug
(
f
"Migrado correctamente todos los datos"
)
data
.
clear
()
elif
source_conn
.
db_type
==
DatabaseTypeEnum
.
MYSQL
.
value
or
\
source_conn
.
db_type
==
DatabaseTypeEnum
.
POSTGRES
.
value
:
cursor
=
temp_connection
.
cursor
()
cursor
.
execute
(
command
)
data
=
[]
data
=
[]
for
row
in
resultSet
:
for
row
in
cursor
.
_rows
:
data
.
append
(
row
)
data
.
append
(
row
)
if
len
(
data
)
==
chunksize
:
if
len
(
data
)
==
chunksize
:
dataframe
=
pd
.
DataFrame
(
data
,
columns
=
columns_name
)
dataframe
=
pd
.
DataFrame
(
data
,
columns
=
columns_name
)
...
@@ -162,16 +187,15 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -162,16 +187,15 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
logger
.
info
(
"Guardado correctamente todos los datos"
)
logger
.
info
(
"Guardado correctamente todos los datos"
)
source_conn
.
close_basic_connection
()
source_conn
.
close_basic_connection
()
else
:
else
:
if
command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|select"
):
if
command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|select"
)
!=
-
1
:
command
=
command
[
command
.
find
(
"select"
):]
command
=
command
[
command
.
lower
()
.
find
(
"select"
):]
steps
=
get_steps
(
command
,
chunksize
,
source_engine
)
steps
=
get_steps
(
command
,
chunksize
,
source_engine
)
# Traemos el iterator
# Traemos el iterator
iterator
=
get_iterator
(
command
,
chunksize
,
source_engine
)
iterator
=
get_iterator
(
command
,
chunksize
,
source_engine
)
logger
.
info
(
f
"Número de pasos para migrar datos: {steps}"
)
logger
.
info
(
f
"Número de pasos para migrar datos: {steps}"
)
for
step
in
range
(
steps
):
for
step
in
range
(
steps
):
dataframe
=
next
(
iterator
)
dataframe
=
next
(
iterator
)
dataframe
[
"INTERN_ID_BCOM"
]
=
None
dataframe
=
dataframe
.
fillna
(
value
=
np
.
nan
)
logger
.
debug
(
dataframe
)
save
=
save_from_dataframe
(
dataframe
,
tablename
,
intern_conn
.
engine
)
save
=
save_from_dataframe
(
dataframe
,
tablename
,
intern_conn
.
engine
)
if
save
:
if
save
:
logger
.
info
(
f
"Guardado correctamente dataframe en el paso {step+1}"
)
logger
.
info
(
f
"Guardado correctamente dataframe en el paso {step+1}"
)
...
...
dags/components/Generation.py
View file @
7772e148
...
@@ -8,6 +8,7 @@ from airflow.decorators import task
...
@@ -8,6 +8,7 @@ from airflow.decorators import task
from
airflow.exceptions
import
AirflowSkipException
from
airflow.exceptions
import
AirflowSkipException
from
enums.ProcessStatusEnum
import
ProcessStatusEnum
from
enums.ProcessStatusEnum
import
ProcessStatusEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.S3Route
import
save_df_to_s3
,
load_obj_to_s3
from
components.S3Route
import
save_df_to_s3
,
load_obj_to_s3
from
components.Utils
import
select_multiple
,
create_temp_file
,
delete_temp_dir
from
components.Utils
import
select_multiple
,
create_temp_file
,
delete_temp_dir
from
components.Control
import
get_tasks_from_control
,
update_new_process
from
components.Control
import
get_tasks_from_control
,
update_new_process
...
@@ -90,8 +91,11 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
...
@@ -90,8 +91,11 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
delimiter
=
params
[
"delimiter"
]
delimiter
=
params
[
"delimiter"
]
tmp_path
=
params
[
"tmp_path"
]
tmp_path
=
params
[
"tmp_path"
]
tmp_file
=
create_temp_file
(
tmp_path
,
filename_mask
,
file_type
,
tablename
,
timezone
,
pattern
)
tmp_file
=
create_temp_file
(
tmp_path
,
filename_mask
,
file_type
,
tablename
,
timezone
,
pattern
)
logger
.
info
(
tmp_file
)
logger
.
info
(
f
"Ruta creada: {tmp_file}"
)
logger
.
debug
(
f
"TABLA: {tablename}"
)
steps
=
get_steps
(
tablename
,
chunksize
,
engine
,
True
)
steps
=
get_steps
(
tablename
,
chunksize
,
engine
,
True
)
if
intern_conn
.
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
:
tablename
=
f
"SELECT * FROM {tablename}"
iterator
=
get_iterator
(
tablename
,
chunksize
,
engine
)
iterator
=
get_iterator
(
tablename
,
chunksize
,
engine
)
logger
.
info
(
f
"Total de pasos para generar archivo resultado: {steps}"
)
logger
.
info
(
f
"Total de pasos para generar archivo resultado: {steps}"
)
for
step
in
range
(
steps
):
for
step
in
range
(
steps
):
...
...
dags/components/Model/InsumoModel.py
View file @
7772e148
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
Column
,
BigInteger
from
sqlalchemy
import
Column
,
BigInteger
from
sqlalchemy.schema
import
Identity
Base
=
declarative_base
()
Base
=
declarative_base
()
...
@@ -9,3 +10,10 @@ class InsumoModel(Base):
...
@@ -9,3 +10,10 @@ class InsumoModel(Base):
__abstract__
=
True
__abstract__
=
True
INTERN_ID_BCOM
=
Column
(
BigInteger
,
primary_key
=
True
,
autoincrement
=
True
)
INTERN_ID_BCOM
=
Column
(
BigInteger
,
primary_key
=
True
,
autoincrement
=
True
)
class
InsumoModelOracle
(
Base
):
__abstract__
=
True
INTERN_ID_BCOM
=
Column
(
BigInteger
,
Identity
(
start
=
1
),
primary_key
=
True
)
dags/components/Utils.py
View file @
7772e148
...
@@ -22,7 +22,7 @@ def get_type_file(key: str) -> FileTypeEnum:
...
@@ -22,7 +22,7 @@ def get_type_file(key: str) -> FileTypeEnum:
result
=
FileTypeEnum
.
EXCEL
result
=
FileTypeEnum
.
EXCEL
try
:
try
:
file_type_sufix
=
key
.
rfind
(
"."
)
file_type_sufix
=
key
.
rfind
(
"."
)
file_type
=
key
[
file_type_sufix
+
1
:]
file_type
=
key
[
file_type_sufix
+
1
:]
result
=
FileTypeEnum
(
file_type
)
result
=
FileTypeEnum
(
file_type
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniedo el tipo de archivo de {key}. {e}"
)
logger
.
error
(
f
"Error obteniedo el tipo de archivo de {key}. {e}"
)
...
@@ -81,10 +81,10 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
...
@@ -81,10 +81,10 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
else
:
else
:
catalog_prefix
=
default_prefix
catalog_prefix
=
default_prefix
s3_catalog
=
catalog_prefix
+
catalog
[
"pattern"
]
s3_catalog
=
catalog_prefix
+
catalog
[
"pattern"
]
data_dict
.
update
({
's3_'
+
catalog_name
:
s3_catalog
,
catalog_name
+
'_key'
:
catalog
[
"key_field"
],
data_dict
.
update
({
's3_'
+
catalog_name
:
s3_catalog
,
catalog_name
+
'_key'
:
catalog
[
"key_field"
],
catalog_name
+
'_value'
:
catalog
[
"value_field"
]})
catalog_name
+
'_value'
:
catalog
[
"value_field"
]})
if
"delimiter"
in
catalog
.
keys
():
if
"delimiter"
in
catalog
.
keys
():
data_dict
.
update
({
catalog_name
+
'_delimiter'
:
catalog
[
"delimiter"
]})
data_dict
.
update
({
catalog_name
+
'_delimiter'
:
catalog
[
"delimiter"
]})
except
Exception
as
e
:
except
Exception
as
e
:
raise
AssertionError
(
f
"Error actualizando dict de catalogos. {e}"
)
raise
AssertionError
(
f
"Error actualizando dict de catalogos. {e}"
)
finally
:
finally
:
...
@@ -107,9 +107,9 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
...
@@ -107,9 +107,9 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
if
item
.
lower
()
.
strip
()
==
"end"
:
if
item
.
lower
()
.
strip
()
==
"end"
:
final_data
[
-
1
]
=
final_data
[
-
1
]
+
"; end;"
final_data
[
-
1
]
=
final_data
[
-
1
]
+
"; end;"
final_item
=
item
final_item
=
item
if
item
.
lower
()
.
strip
()
.
find
(
label_tablename
.
lower
()
.
strip
()
+
":"
)
!=
-
1
:
if
item
.
lower
()
.
strip
()
.
find
(
label_tablename
.
lower
()
.
strip
()
+
":"
)
!=
-
1
:
init_index
=
item
.
lower
()
.
strip
()
.
index
(
label_tablename
.
lower
()
.
strip
()
+
":"
)
init_index
=
item
.
lower
()
.
strip
()
.
index
(
label_tablename
.
lower
()
.
strip
()
+
":"
)
table_name
=
item
.
replace
(
" "
,
""
)
.
strip
()[
init_index
+
5
:]
.
strip
()
table_name
=
item
.
replace
(
" "
,
""
)
.
strip
()[
init_index
+
len
(
label_tablename
+
":"
)
:]
.
strip
()
add_next
=
True
add_next
=
True
elif
item
!=
""
:
elif
item
!=
""
:
if
add_next
:
if
add_next
:
...
@@ -117,9 +117,10 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
...
@@ -117,9 +117,10 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
add_next
=
False
add_next
=
False
final_item
=
item
.
strip
()
final_item
=
item
.
strip
()
table_name
=
""
table_name
=
""
if
final_item
.
strip
()[:
2
]
in
comments
and
(
"update "
in
final_item
.
lower
()
or
"delete "
in
final_item
.
lower
()
or
if
final_item
.
strip
()[:
2
]
in
comments
and
(
"alter"
in
final_item
.
lower
()
or
"create"
in
final_item
.
lower
()
or
"update "
in
final_item
.
lower
()
or
"delete "
in
final_item
.
lower
()
or
"drop"
in
final_item
.
lower
()):
"alter"
in
final_item
.
lower
()
or
"create"
in
final_item
.
lower
()
or
"drop"
in
final_item
.
lower
()):
trans_index
=
final_item
.
lower
()
.
find
(
"update"
)
trans_index
=
final_item
.
lower
()
.
find
(
"update"
)
if
trans_index
!=
-
1
:
if
trans_index
!=
-
1
:
final_item
=
final_item
[
trans_index
:]
final_item
=
final_item
[
trans_index
:]
...
@@ -135,6 +136,9 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
...
@@ -135,6 +136,9 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
drop_index
=
final_item
.
lower
()
.
find
(
"drop"
)
drop_index
=
final_item
.
lower
()
.
find
(
"drop"
)
if
drop_index
!=
-
1
:
if
drop_index
!=
-
1
:
final_item
=
final_item
[
drop_index
:]
final_item
=
final_item
[
drop_index
:]
call_index
=
final_item
.
lower
()
.
find
(
"call"
)
if
call_index
!=
-
1
:
final_item
=
final_item
[
call_index
:]
final_item
=
final_item
.
replace
(
"
%
"
,
"
%%
"
)
final_item
=
final_item
.
replace
(
"
%
"
,
"
%%
"
)
final_data
.
append
(
final_item
)
final_data
.
append
(
final_item
)
final_data
=
[
item
.
replace
(
"
\t
"
,
""
)
for
item
in
final_data
if
item
!=
""
and
(
"select"
in
item
.
lower
()
or
final_data
=
[
item
.
replace
(
"
\t
"
,
""
)
for
item
in
final_data
if
item
!=
""
and
(
"select"
in
item
.
lower
()
or
...
@@ -144,7 +148,8 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
...
@@ -144,7 +148,8 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
"alter"
in
item
.
lower
()
or
"alter"
in
item
.
lower
()
or
"create"
in
item
.
lower
()
or
"create"
in
item
.
lower
()
or
"drop"
in
item
.
lower
()
or
"drop"
in
item
.
lower
()
or
"commit"
in
item
.
lower
())]
"commit"
in
item
.
lower
()
or
"call"
in
item
.
lower
())]
result
.
append
((
row
[
0
],
final_data
))
result
.
append
((
row
[
0
],
final_data
))
logger
.
info
(
f
"Lista de comandos: {result}"
)
logger
.
info
(
f
"Lista de comandos: {result}"
)
except
Exception
as
e
:
except
Exception
as
e
:
...
@@ -157,9 +162,10 @@ def select_multiple(command: str) -> Dict[str, Any]:
...
@@ -157,9 +162,10 @@ def select_multiple(command: str) -> Dict[str, Any]:
response
=
{
'is_multiple'
:
False
,
'tablename'
:
''
}
response
=
{
'is_multiple'
:
False
,
'tablename'
:
''
}
tablename
=
""
tablename
=
""
no_procedure_init
=
"|select"
no_procedure_init
=
"|select"
procedure_init
=
[
"|begin"
]
procedure_init
=
[
"|begin"
,
"|call"
]
try
:
try
:
if
command
.
lower
()
.
replace
(
" "
,
""
)
.
find
(
procedure_init
[
0
])
!=
-
1
:
if
command
.
lower
()
.
replace
(
" "
,
""
)
.
find
(
procedure_init
[
0
])
!=
-
1
or
\
command
.
lower
()
.
replace
(
" "
,
""
)
.
find
(
procedure_init
[
1
])
!=
-
1
:
response
[
"is_multiple"
]
=
True
response
[
"is_multiple"
]
=
True
tablename
=
command
[:
command
.
index
(
"|"
)]
.
strip
()
tablename
=
command
[:
command
.
index
(
"|"
)]
.
strip
()
response
[
"tablename"
]
=
tablename
response
[
"tablename"
]
=
tablename
...
@@ -171,7 +177,7 @@ def select_multiple(command: str) -> Dict[str, Any]:
...
@@ -171,7 +177,7 @@ def select_multiple(command: str) -> Dict[str, Any]:
if
init_index
==
-
1
:
if
init_index
==
-
1
:
raise
AssertionError
(
"Query malformed"
)
raise
AssertionError
(
"Query malformed"
)
else
:
else
:
from_command
=
command
[
init_index
+
4
:]
from_command
=
command
[
init_index
+
4
:]
tablename_base
=
from_command
.
strip
()
.
split
(
" "
)
tablename_base
=
from_command
.
strip
()
.
split
(
" "
)
if
len
(
tablename_base
)
>
0
and
tablename
==
""
:
if
len
(
tablename_base
)
>
0
and
tablename
==
""
:
tablename
=
tablename_base
[
0
]
tablename
=
tablename_base
[
0
]
...
...
dags/procedure_definition.json
View file @
7772e148
...
@@ -2,29 +2,28 @@
...
@@ -2,29 +2,28 @@
"procedure_identifier"
:
"PROCEDURE_1"
,
"procedure_identifier"
:
"PROCEDURE_1"
,
"fields"
:
[
"fields"
:
[
{
{
"identifier"
:
"CD_
EMPRESA
"
,
"identifier"
:
"CD_
FOLIO
"
,
"datatype"
:
"
NUMBER
"
,
"datatype"
:
"
TEXT
"
,
"decimal_precision"
:
0
,
"decimal_precision"
:
0
,
"maxLength"
:
null
"maxLength"
:
100
},
},
{
{
"identifier"
:
"CD_
FOLIO
"
,
"identifier"
:
"CD_
CUENTA
"
,
"datatype"
:
"TEXT"
,
"datatype"
:
"TEXT"
,
"decimal_precision"
:
null
,
"decimal_precision"
:
null
,
"maxLength"
:
100
"maxLength"
:
100
},
},
{
{
"identifier"
:
"CD_
CLIEN
TE"
,
"identifier"
:
"CD_
PAQUE
TE"
,
"datatype"
:
"TEXT"
,
"datatype"
:
"TEXT"
,
"decimal_precision"
:
null
,
"decimal_precision"
:
null
,
"maxLength"
:
5
0
"maxLength"
:
10
0
},
},
{
{
"identifier"
:
"FH_CONTRATACION"
,
"identifier"
:
"NB_PAQUETE"
,
"datatype"
:
"DATE"
,
"datatype"
:
"TEXT"
,
"pattern"
:
"%d-%m-%y"
,
"decimal_precision"
:
null
,
"decimal_precision"
:
null
,
"maxLength"
:
null
"maxLength"
:
200
}
}
]
]
},
},
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment