Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
82505e1b
Commit
82505e1b
authored
Jul 26, 2023
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Update 25-07-23. Solve issues 351, 352
parent
bbfda096
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
84 additions
and
33 deletions
+84
-33
Mysql.py
dags/components/Databases/Mysql.py
+12
-0
Postgres.py
dags/components/Databases/Postgres.py
+13
-0
Extractor.py
dags/components/Extractor.py
+30
-9
Utils.py
dags/components/Utils.py
+20
-14
procedure_definition.json
dags/procedure_definition.json
+9
-10
No files found.
dags/components/Databases/Mysql.py
View file @
82505e1b
from
typing
import
List
,
Tuple
from
typing
import
List
,
Tuple
import
pymysql
from
sqlalchemy
import
create_engine
from
sqlalchemy
import
create_engine
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
components.Model.InsumoModel
import
InsumoModel
from
components.Model.InsumoModel
import
InsumoModel
...
@@ -22,6 +23,17 @@ class Mysql:
...
@@ -22,6 +23,17 @@ class Mysql:
self
.
password
=
password
self
.
password
=
password
self
.
database
=
database
self
.
database
=
database
self
.
engine
=
None
self
.
engine
=
None
self
.
connection
=
None
def
get_basic_connection
(
self
):
try
:
if
isinstance
(
self
.
connection
,
type
(
None
)):
self
.
connection
=
pymysql
.
connect
(
host
=
self
.
host
,
port
=
self
.
port
,
user
=
self
.
user
,
password
=
self
.
password
,
db
=
self
.
database
)
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo conexion básica de Oracle. {e}"
)
finally
:
return
self
.
connection
def
create_engine
(
self
)
->
None
:
def
create_engine
(
self
)
->
None
:
try
:
try
:
...
...
dags/components/Databases/Postgres.py
View file @
82505e1b
from
typing
import
List
,
Tuple
from
typing
import
List
,
Tuple
import
psycopg2
from
sqlalchemy
import
create_engine
from
sqlalchemy
import
create_engine
from
components.Model.InsumoModel
import
InsumoModel
from
components.Model.InsumoModel
import
InsumoModel
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
...
@@ -23,6 +24,18 @@ class Postgres:
...
@@ -23,6 +24,18 @@ class Postgres:
self
.
schema
=
schema
self
.
schema
=
schema
self
.
engine
=
None
self
.
engine
=
None
self
.
DEFAULT_VAR_LENGHT
=
100
self
.
DEFAULT_VAR_LENGHT
=
100
self
.
connection
=
None
def
get_basic_connection
(
self
):
try
:
if
isinstance
(
self
.
connection
,
type
(
None
)):
self
.
connection
=
psycopg2
.
connect
(
host
=
self
.
host
,
port
=
self
.
port
,
user
=
self
.
user
,
password
=
self
.
password
,
database
=
self
.
database
,
options
=
"-c search_path="
+
self
.
schema
)
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo conexion básica de Oracle. {e}"
)
finally
:
return
self
.
connection
def
create_engine
(
self
)
->
None
:
def
create_engine
(
self
)
->
None
:
try
:
try
:
...
...
dags/components/Extractor.py
View file @
82505e1b
...
@@ -94,9 +94,10 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -94,9 +94,10 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
# Create the model with the procedure descriptor
# Create the model with the procedure descriptor
if
command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|begin"
)
!=
-
1
:
if
command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|begin"
)
!=
-
1
:
tablename
=
command
[:
command
.
find
(
"|"
)]
tablename
=
command
[:
command
.
find
(
"|"
)]
elif
command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|call"
)
!=
-
1
:
tablename
=
command
[:
command
.
find
(
"|"
)]
else
:
else
:
proc_name
=
command
[
len
(
"begin"
):
command
.
rfind
(
"end"
)]
raise
AssertionError
(
"Procedure mal formed"
)
tablename
=
proc_name
.
strip
()
.
replace
(
";"
,
""
)
task
=
kwargs
[
'ti'
]
task
=
kwargs
[
'ti'
]
procedures
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"PROCEDURE-JSON"
)
procedures
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"PROCEDURE-JSON"
)
model
=
None
model
=
None
...
@@ -121,7 +122,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -121,7 +122,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
with
source_engine
.
connect
()
as
connection
:
with
source_engine
.
connect
()
as
connection
:
final_command
=
command_for_create
final_command
=
command_for_create
if
final_command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|select"
)
!=
-
1
:
if
final_command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|select"
)
!=
-
1
:
final_command
=
final_command
[
final_command
.
find
(
"select"
):]
final_command
=
final_command
[
final_command
.
lower
()
.
find
(
"select"
):]
result
=
connection
.
execute
(
final_command
)
result
=
connection
.
execute
(
final_command
)
fields
=
result
.
cursor
.
description
fields
=
result
.
cursor
.
description
for
field
in
fields
:
for
field
in
fields
:
...
@@ -141,11 +142,31 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -141,11 +142,31 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
if
is_tablename
:
if
is_tablename
:
command
=
command
[
len
(
tablename
+
":"
):]
command
=
command
[
len
(
tablename
+
":"
):]
temp_connection
=
source_conn
.
get_basic_connection
()
temp_connection
=
source_conn
.
get_basic_connection
()
cursor
=
temp_connection
.
cursor
()
if
source_conn
.
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
:
cursor
.
execute
(
command
)
cursor
=
temp_connection
.
cursor
()
for
resultSet
in
cursor
.
getimplicitresults
():
cursor
.
execute
(
command
)
for
resultSet
in
cursor
.
getimplicitresults
():
data
=
[]
for
row
in
resultSet
:
data
.
append
(
row
)
if
len
(
data
)
==
chunksize
:
dataframe
=
pd
.
DataFrame
(
data
,
columns
=
columns_name
)
save
=
save_from_dataframe
(
dataframe
,
tablename
,
intern_conn
.
engine
)
if
save
:
logger
.
debug
(
f
"Guardado correctamente dataframe. Procesando más bloques"
)
data
.
clear
()
if
len
(
data
)
>
0
:
dataframe
=
pd
.
DataFrame
(
data
,
columns
=
columns_name
)
save
=
save_from_dataframe
(
dataframe
,
tablename
,
intern_conn
.
engine
)
if
save
:
logger
.
debug
(
f
"Migrado correctamente todos los datos"
)
data
.
clear
()
elif
source_conn
.
db_type
==
DatabaseTypeEnum
.
MYSQL
.
value
or
\
source_conn
.
db_type
==
DatabaseTypeEnum
.
POSTGRES
.
value
:
cursor
=
temp_connection
.
cursor
()
cursor
.
execute
(
command
)
data
=
[]
data
=
[]
for
row
in
resultSet
:
for
row
in
cursor
.
_rows
:
data
.
append
(
row
)
data
.
append
(
row
)
if
len
(
data
)
==
chunksize
:
if
len
(
data
)
==
chunksize
:
dataframe
=
pd
.
DataFrame
(
data
,
columns
=
columns_name
)
dataframe
=
pd
.
DataFrame
(
data
,
columns
=
columns_name
)
...
@@ -162,8 +183,8 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -162,8 +183,8 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
logger
.
info
(
"Guardado correctamente todos los datos"
)
logger
.
info
(
"Guardado correctamente todos los datos"
)
source_conn
.
close_basic_connection
()
source_conn
.
close_basic_connection
()
else
:
else
:
if
command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|select"
):
if
command
.
replace
(
" "
,
""
)
.
lower
()
.
find
(
"|select"
)
!=
-
1
:
command
=
command
[
command
.
find
(
"select"
):]
command
=
command
[
command
.
lower
()
.
find
(
"select"
):]
steps
=
get_steps
(
command
,
chunksize
,
source_engine
)
steps
=
get_steps
(
command
,
chunksize
,
source_engine
)
# Traemos el iterator
# Traemos el iterator
iterator
=
get_iterator
(
command
,
chunksize
,
source_engine
)
iterator
=
get_iterator
(
command
,
chunksize
,
source_engine
)
...
...
dags/components/Utils.py
View file @
82505e1b
...
@@ -22,7 +22,7 @@ def get_type_file(key: str) -> FileTypeEnum:
...
@@ -22,7 +22,7 @@ def get_type_file(key: str) -> FileTypeEnum:
result
=
FileTypeEnum
.
EXCEL
result
=
FileTypeEnum
.
EXCEL
try
:
try
:
file_type_sufix
=
key
.
rfind
(
"."
)
file_type_sufix
=
key
.
rfind
(
"."
)
file_type
=
key
[
file_type_sufix
+
1
:]
file_type
=
key
[
file_type_sufix
+
1
:]
result
=
FileTypeEnum
(
file_type
)
result
=
FileTypeEnum
(
file_type
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniedo el tipo de archivo de {key}. {e}"
)
logger
.
error
(
f
"Error obteniedo el tipo de archivo de {key}. {e}"
)
...
@@ -81,10 +81,10 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
...
@@ -81,10 +81,10 @@ def update_dict_with_catalogs(data_dict: Dict[str, Any], data: Dict[str, Any], c
else
:
else
:
catalog_prefix
=
default_prefix
catalog_prefix
=
default_prefix
s3_catalog
=
catalog_prefix
+
catalog
[
"pattern"
]
s3_catalog
=
catalog_prefix
+
catalog
[
"pattern"
]
data_dict
.
update
({
's3_'
+
catalog_name
:
s3_catalog
,
catalog_name
+
'_key'
:
catalog
[
"key_field"
],
data_dict
.
update
({
's3_'
+
catalog_name
:
s3_catalog
,
catalog_name
+
'_key'
:
catalog
[
"key_field"
],
catalog_name
+
'_value'
:
catalog
[
"value_field"
]})
catalog_name
+
'_value'
:
catalog
[
"value_field"
]})
if
"delimiter"
in
catalog
.
keys
():
if
"delimiter"
in
catalog
.
keys
():
data_dict
.
update
({
catalog_name
+
'_delimiter'
:
catalog
[
"delimiter"
]})
data_dict
.
update
({
catalog_name
+
'_delimiter'
:
catalog
[
"delimiter"
]})
except
Exception
as
e
:
except
Exception
as
e
:
raise
AssertionError
(
f
"Error actualizando dict de catalogos. {e}"
)
raise
AssertionError
(
f
"Error actualizando dict de catalogos. {e}"
)
finally
:
finally
:
...
@@ -107,9 +107,9 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
...
@@ -107,9 +107,9 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
if
item
.
lower
()
.
strip
()
==
"end"
:
if
item
.
lower
()
.
strip
()
==
"end"
:
final_data
[
-
1
]
=
final_data
[
-
1
]
+
"; end;"
final_data
[
-
1
]
=
final_data
[
-
1
]
+
"; end;"
final_item
=
item
final_item
=
item
if
item
.
lower
()
.
strip
()
.
find
(
label_tablename
.
lower
()
.
strip
()
+
":"
)
!=
-
1
:
if
item
.
lower
()
.
strip
()
.
find
(
label_tablename
.
lower
()
.
strip
()
+
":"
)
!=
-
1
:
init_index
=
item
.
lower
()
.
strip
()
.
index
(
label_tablename
.
lower
()
.
strip
()
+
":"
)
init_index
=
item
.
lower
()
.
strip
()
.
index
(
label_tablename
.
lower
()
.
strip
()
+
":"
)
table_name
=
item
.
replace
(
" "
,
""
)
.
strip
()[
init_index
+
5
:]
.
strip
()
table_name
=
item
.
replace
(
" "
,
""
)
.
strip
()[
init_index
+
len
(
label_tablename
+
":"
)
:]
.
strip
()
add_next
=
True
add_next
=
True
elif
item
!=
""
:
elif
item
!=
""
:
if
add_next
:
if
add_next
:
...
@@ -117,9 +117,10 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
...
@@ -117,9 +117,10 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
add_next
=
False
add_next
=
False
final_item
=
item
.
strip
()
final_item
=
item
.
strip
()
table_name
=
""
table_name
=
""
if
final_item
.
strip
()[:
2
]
in
comments
and
(
"update "
in
final_item
.
lower
()
or
"delete "
in
final_item
.
lower
()
or
if
final_item
.
strip
()[:
2
]
in
comments
and
(
"alter"
in
final_item
.
lower
()
or
"create"
in
final_item
.
lower
()
or
"update "
in
final_item
.
lower
()
or
"delete "
in
final_item
.
lower
()
or
"drop"
in
final_item
.
lower
()):
"alter"
in
final_item
.
lower
()
or
"create"
in
final_item
.
lower
()
or
"drop"
in
final_item
.
lower
()):
trans_index
=
final_item
.
lower
()
.
find
(
"update"
)
trans_index
=
final_item
.
lower
()
.
find
(
"update"
)
if
trans_index
!=
-
1
:
if
trans_index
!=
-
1
:
final_item
=
final_item
[
trans_index
:]
final_item
=
final_item
[
trans_index
:]
...
@@ -135,6 +136,9 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
...
@@ -135,6 +136,9 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
drop_index
=
final_item
.
lower
()
.
find
(
"drop"
)
drop_index
=
final_item
.
lower
()
.
find
(
"drop"
)
if
drop_index
!=
-
1
:
if
drop_index
!=
-
1
:
final_item
=
final_item
[
drop_index
:]
final_item
=
final_item
[
drop_index
:]
call_index
=
final_item
.
lower
()
.
find
(
"call"
)
if
call_index
!=
-
1
:
final_item
=
final_item
[
call_index
:]
final_item
=
final_item
.
replace
(
"
%
"
,
"
%%
"
)
final_item
=
final_item
.
replace
(
"
%
"
,
"
%%
"
)
final_data
.
append
(
final_item
)
final_data
.
append
(
final_item
)
final_data
=
[
item
.
replace
(
"
\t
"
,
""
)
for
item
in
final_data
if
item
!=
""
and
(
"select"
in
item
.
lower
()
or
final_data
=
[
item
.
replace
(
"
\t
"
,
""
)
for
item
in
final_data
if
item
!=
""
and
(
"select"
in
item
.
lower
()
or
...
@@ -144,7 +148,8 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
...
@@ -144,7 +148,8 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str) ->
"alter"
in
item
.
lower
()
or
"alter"
in
item
.
lower
()
or
"create"
in
item
.
lower
()
or
"create"
in
item
.
lower
()
or
"drop"
in
item
.
lower
()
or
"drop"
in
item
.
lower
()
or
"commit"
in
item
.
lower
())]
"commit"
in
item
.
lower
()
or
"call"
in
item
.
lower
())]
result
.
append
((
row
[
0
],
final_data
))
result
.
append
((
row
[
0
],
final_data
))
logger
.
info
(
f
"Lista de comandos: {result}"
)
logger
.
info
(
f
"Lista de comandos: {result}"
)
except
Exception
as
e
:
except
Exception
as
e
:
...
@@ -157,9 +162,10 @@ def select_multiple(command: str) -> Dict[str, Any]:
...
@@ -157,9 +162,10 @@ def select_multiple(command: str) -> Dict[str, Any]:
response
=
{
'is_multiple'
:
False
,
'tablename'
:
''
}
response
=
{
'is_multiple'
:
False
,
'tablename'
:
''
}
tablename
=
""
tablename
=
""
no_procedure_init
=
"|select"
no_procedure_init
=
"|select"
procedure_init
=
[
"|begin"
]
procedure_init
=
[
"|begin"
,
"|call"
]
try
:
try
:
if
command
.
lower
()
.
replace
(
" "
,
""
)
.
find
(
procedure_init
[
0
])
!=
-
1
:
if
command
.
lower
()
.
replace
(
" "
,
""
)
.
find
(
procedure_init
[
0
])
!=
-
1
or
\
command
.
lower
()
.
replace
(
" "
,
""
)
.
find
(
procedure_init
[
1
])
!=
-
1
:
response
[
"is_multiple"
]
=
True
response
[
"is_multiple"
]
=
True
tablename
=
command
[:
command
.
index
(
"|"
)]
.
strip
()
tablename
=
command
[:
command
.
index
(
"|"
)]
.
strip
()
response
[
"tablename"
]
=
tablename
response
[
"tablename"
]
=
tablename
...
@@ -171,7 +177,7 @@ def select_multiple(command: str) -> Dict[str, Any]:
...
@@ -171,7 +177,7 @@ def select_multiple(command: str) -> Dict[str, Any]:
if
init_index
==
-
1
:
if
init_index
==
-
1
:
raise
AssertionError
(
"Query malformed"
)
raise
AssertionError
(
"Query malformed"
)
else
:
else
:
from_command
=
command
[
init_index
+
4
:]
from_command
=
command
[
init_index
+
4
:]
tablename_base
=
from_command
.
strip
()
.
split
(
" "
)
tablename_base
=
from_command
.
strip
()
.
split
(
" "
)
if
len
(
tablename_base
)
>
0
and
tablename
==
""
:
if
len
(
tablename_base
)
>
0
and
tablename
==
""
:
tablename
=
tablename_base
[
0
]
tablename
=
tablename_base
[
0
]
...
...
dags/procedure_definition.json
View file @
82505e1b
...
@@ -2,29 +2,28 @@
...
@@ -2,29 +2,28 @@
"procedure_identifier"
:
"PROCEDURE_1"
,
"procedure_identifier"
:
"PROCEDURE_1"
,
"fields"
:
[
"fields"
:
[
{
{
"identifier"
:
"CD_
EMPRESA
"
,
"identifier"
:
"CD_
FOLIO
"
,
"datatype"
:
"
NUMBER
"
,
"datatype"
:
"
TEXT
"
,
"decimal_precision"
:
0
,
"decimal_precision"
:
0
,
"maxLength"
:
null
"maxLength"
:
100
},
},
{
{
"identifier"
:
"CD_
FOLIO
"
,
"identifier"
:
"CD_
CUENTA
"
,
"datatype"
:
"TEXT"
,
"datatype"
:
"TEXT"
,
"decimal_precision"
:
null
,
"decimal_precision"
:
null
,
"maxLength"
:
100
"maxLength"
:
100
},
},
{
{
"identifier"
:
"CD_
CLIEN
TE"
,
"identifier"
:
"CD_
PAQUE
TE"
,
"datatype"
:
"TEXT"
,
"datatype"
:
"TEXT"
,
"decimal_precision"
:
null
,
"decimal_precision"
:
null
,
"maxLength"
:
5
0
"maxLength"
:
10
0
},
},
{
{
"identifier"
:
"FH_CONTRATACION"
,
"identifier"
:
"NB_PAQUETE"
,
"datatype"
:
"DATE"
,
"datatype"
:
"TEXT"
,
"pattern"
:
"%d-%m-%y"
,
"decimal_precision"
:
null
,
"decimal_precision"
:
null
,
"maxLength"
:
null
"maxLength"
:
200
}
}
]
]
},
},
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment