Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
ab36e03a
Commit
ab36e03a
authored
Jul 31, 2023
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Update 30-07-23. Fix some bugs about Postgres and Oracle DB
parent
5fe4a975
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
44 additions
and
17 deletions
+44
-17
Database.py
dags/components/Databases/Database.py
+3
-2
Mysql.py
dags/components/Databases/Mysql.py
+9
-4
Oracle.py
dags/components/Databases/Oracle.py
+9
-4
Postgres.py
dags/components/Databases/Postgres.py
+9
-4
Extractor.py
dags/components/Extractor.py
+6
-3
InsumoModel.py
dags/components/Model/InsumoModel.py
+8
-0
No files found.
dags/components/Databases/Database.py
View file @
ab36e03a
...
@@ -55,10 +55,11 @@ class Database:
...
@@ -55,10 +55,11 @@ class Database:
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error creando db engine. {e}"
)
logger
.
error
(
f
"Error creando db engine. {e}"
)
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
modelName
:
str
=
"TableModel"
)
->
bool
:
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
db_target
:
str
,
modelName
:
str
=
"TableModel"
)
->
bool
:
create
=
False
create
=
False
try
:
try
:
create
=
self
.
factory
.
create_model
(
tablename
,
fields
,
modelName
)
create
=
self
.
factory
.
create_model
(
tablename
,
fields
,
db_target
,
modelName
)
except
Exception
as
e
:
except
Exception
as
e
:
raise
AssertionError
(
f
"Error creando tabla dinámica con nombre {tablename}. {e}"
)
raise
AssertionError
(
f
"Error creando tabla dinámica con nombre {tablename}. {e}"
)
finally
:
finally
:
...
...
dags/components/Databases/Mysql.py
View file @
ab36e03a
...
@@ -3,7 +3,8 @@ from typing import List, Tuple
...
@@ -3,7 +3,8 @@ from typing import List, Tuple
import
pymysql
import
pymysql
from
sqlalchemy
import
create_engine
from
sqlalchemy
import
create_engine
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
components.Model.InsumoModel
import
InsumoModel
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.Model.InsumoModel
import
InsumoModel
,
InsumoModelOracle
from
components.Databases.Enums.MysqlDataTypeEnum
import
MysqlDataTypeEnum
from
components.Databases.Enums.MysqlDataTypeEnum
import
MysqlDataTypeEnum
from
components.Databases.Enums.MysqlDataTypeORMEnum
import
MysqlDataTypeORMEnum
from
components.Databases.Enums.MysqlDataTypeORMEnum
import
MysqlDataTypeORMEnum
...
@@ -43,13 +44,17 @@ class Mysql:
...
@@ -43,13 +44,17 @@ class Mysql:
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error creando engine de Mysql. {e}"
)
logger
.
error
(
f
"Error creando engine de Mysql. {e}"
)
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
modelName
:
str
=
"TableModel"
)
->
bool
:
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
db_target
:
str
,
modelName
:
str
=
"TableModel"
)
:
model
=
None
model
=
None
try
:
try
:
model
=
type
(
modelName
,
(
InsumoModel
,),
{
db_args
=
{
'__tablename__'
:
tablename
,
'__tablename__'
:
tablename
,
'__table_args__'
:
{
'extend_existing'
:
True
}
'__table_args__'
:
{
'extend_existing'
:
True
}
})
}
if
db_target
==
DatabaseTypeEnum
.
ORACLE
.
value
:
model
=
type
(
modelName
,
(
InsumoModelOracle
,),
db_args
)
else
:
model
=
type
(
modelName
,
(
InsumoModel
,),
db_args
)
for
field
in
fields
:
for
field
in
fields
:
logger
.
debug
(
f
"Attribute: {field}"
)
logger
.
debug
(
f
"Attribute: {field}"
)
name
=
field
[
0
]
name
=
field
[
0
]
...
...
dags/components/Databases/Oracle.py
View file @
ab36e03a
...
@@ -4,9 +4,10 @@ from sqlalchemy import create_engine
...
@@ -4,9 +4,10 @@ from sqlalchemy import create_engine
import
oracledb
import
oracledb
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.Databases.Enums.OracleDataTypeEnum
import
OracleDataTypeEnum
from
components.Databases.Enums.OracleDataTypeEnum
import
OracleDataTypeEnum
from
components.Databases.Enums.OracleDataTypeORMEnum
import
OracleDataTypeORMEnum
from
components.Databases.Enums.OracleDataTypeORMEnum
import
OracleDataTypeORMEnum
from
components.Model.InsumoModel
import
InsumoModel
from
components.Model.InsumoModel
import
InsumoModel
,
InsumoModelOracle
from
sqlalchemy
import
Column
from
sqlalchemy
import
Column
import
logging
import
logging
...
@@ -43,13 +44,17 @@ class Oracle:
...
@@ -43,13 +44,17 @@ class Oracle:
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error creando engine de Oracle. {e}"
)
logger
.
error
(
f
"Error creando engine de Oracle. {e}"
)
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
modelName
:
str
=
"TableModel"
)
->
bool
:
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
db_target
:
str
,
modelName
:
str
=
"TableModel"
)
:
model
=
None
model
=
None
try
:
try
:
model
=
type
(
modelName
,
(
InsumoModel
,),
{
db_args
=
{
'__tablename__'
:
tablename
,
'__tablename__'
:
tablename
,
'__table_args__'
:
{
'extend_existing'
:
True
}
'__table_args__'
:
{
'extend_existing'
:
True
}
})
}
if
db_target
==
DatabaseTypeEnum
.
ORACLE
.
value
:
model
=
type
(
modelName
,
(
InsumoModelOracle
,),
db_args
)
else
:
model
=
type
(
modelName
,
(
InsumoModel
,),
db_args
)
for
field
in
fields
:
for
field
in
fields
:
logger
.
debug
(
f
"Attribute: {field}"
)
logger
.
debug
(
f
"Attribute: {field}"
)
name
=
field
[
0
]
name
=
field
[
0
]
...
...
dags/components/Databases/Postgres.py
View file @
ab36e03a
...
@@ -2,8 +2,9 @@ from typing import List, Tuple
...
@@ -2,8 +2,9 @@ from typing import List, Tuple
import
psycopg2
import
psycopg2
from
sqlalchemy
import
create_engine
from
sqlalchemy
import
create_engine
from
components.Model.InsumoModel
import
InsumoModel
from
components.Model.InsumoModel
import
InsumoModel
,
InsumoModelOracle
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.Databases.Enums.PostgresDataTypeEnum
import
PostgresDataTypeEnum
from
components.Databases.Enums.PostgresDataTypeEnum
import
PostgresDataTypeEnum
from
components.Databases.Enums.PostgresDataTypeORMEnum
import
PostgresDataTypeORMEnum
from
components.Databases.Enums.PostgresDataTypeORMEnum
import
PostgresDataTypeORMEnum
...
@@ -46,13 +47,17 @@ class Postgres:
...
@@ -46,13 +47,17 @@ class Postgres:
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error creando engine de Postgres. {e}"
)
logger
.
error
(
f
"Error creando engine de Postgres. {e}"
)
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
modelName
:
str
=
"TableModel"
):
def
create_model
(
self
,
tablename
:
str
,
fields
:
List
[
Tuple
[
str
]],
db_target
:
str
,
modelName
:
str
=
"TableModel"
):
model
=
None
model
=
None
try
:
try
:
model
=
type
(
modelName
,
(
InsumoModel
,),
{
db_args
=
{
'__tablename__'
:
tablename
,
'__tablename__'
:
tablename
,
'__table_args__'
:
{
'extend_existing'
:
True
}
'__table_args__'
:
{
'extend_existing'
:
True
}
})
}
if
db_target
==
DatabaseTypeEnum
.
ORACLE
.
value
:
model
=
type
(
modelName
,
(
InsumoModelOracle
,),
db_args
)
else
:
model
=
type
(
modelName
,
(
InsumoModel
,),
db_args
)
for
field
in
fields
:
for
field
in
fields
:
logger
.
debug
(
f
"Attribute: {field}"
)
logger
.
debug
(
f
"Attribute: {field}"
)
name
=
field
[
0
]
name
=
field
[
0
]
...
...
dags/components/Extractor.py
View file @
ab36e03a
from
typing
import
Any
,
Dict
from
typing
import
Any
,
Dict
import
json
import
json
import
numpy
as
np
import
pandas
as
pd
import
pandas
as
pd
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
enums.ProcessStatusEnum
import
ProcessStatusEnum
from
enums.ProcessStatusEnum
import
ProcessStatusEnum
...
@@ -113,6 +114,9 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -113,6 +114,9 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
else
:
else
:
if
source_conn
.
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
and
\
if
source_conn
.
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
and
\
not
extract_type
.
startswith
(
OperationTypeEnum
.
PROCEDURE
.
value
):
not
extract_type
.
startswith
(
OperationTypeEnum
.
PROCEDURE
.
value
):
values
=
command_for_create
.
split
(
"|"
)
if
len
(
values
)
>
1
:
command_for_create
=
values
[
1
]
command_for_create
=
f
"SELECT * FROM ({command_for_create}) OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY"
command_for_create
=
f
"SELECT * FROM ({command_for_create}) OFFSET 1 ROWS FETCH NEXT 1 ROWS ONLY"
else
:
else
:
command_words
=
command_for_create
.
split
(
" "
)
command_words
=
command_for_create
.
split
(
" "
)
...
@@ -131,7 +135,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -131,7 +135,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
logger
.
debug
(
f
"Columnas procesadas: {columns}"
)
logger
.
debug
(
f
"Columnas procesadas: {columns}"
)
multiple
=
select_multiple
(
command
)
multiple
=
select_multiple
(
command
)
tablename
=
multiple
[
"tablename"
]
tablename
=
multiple
[
"tablename"
]
model
=
source_conn
.
create_model
(
tablename
,
columns
)
model
=
source_conn
.
create_model
(
tablename
,
columns
,
intern_conn
.
db_type
)
try
:
try
:
create
=
intern_conn
.
create_table
(
model
)
create
=
intern_conn
.
create_table
(
model
)
if
create
:
if
create
:
...
@@ -191,8 +195,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
...
@@ -191,8 +195,7 @@ def extract_from_source(command: str, source_conn, intern_conn, chunksize: int,
logger
.
info
(
f
"Número de pasos para migrar datos: {steps}"
)
logger
.
info
(
f
"Número de pasos para migrar datos: {steps}"
)
for
step
in
range
(
steps
):
for
step
in
range
(
steps
):
dataframe
=
next
(
iterator
)
dataframe
=
next
(
iterator
)
dataframe
[
"INTERN_ID_BCOM"
]
=
None
dataframe
=
dataframe
.
fillna
(
value
=
np
.
nan
)
logger
.
debug
(
dataframe
)
save
=
save_from_dataframe
(
dataframe
,
tablename
,
intern_conn
.
engine
)
save
=
save_from_dataframe
(
dataframe
,
tablename
,
intern_conn
.
engine
)
if
save
:
if
save
:
logger
.
info
(
f
"Guardado correctamente dataframe en el paso {step+1}"
)
logger
.
info
(
f
"Guardado correctamente dataframe en el paso {step+1}"
)
...
...
dags/components/Model/InsumoModel.py
View file @
ab36e03a
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
Column
,
BigInteger
from
sqlalchemy
import
Column
,
BigInteger
from
sqlalchemy.schema
import
Identity
Base
=
declarative_base
()
Base
=
declarative_base
()
...
@@ -9,3 +10,10 @@ class InsumoModel(Base):
...
@@ -9,3 +10,10 @@ class InsumoModel(Base):
__abstract__
=
True
__abstract__
=
True
INTERN_ID_BCOM
=
Column
(
BigInteger
,
primary_key
=
True
,
autoincrement
=
True
)
INTERN_ID_BCOM
=
Column
(
BigInteger
,
primary_key
=
True
,
autoincrement
=
True
)
class
InsumoModelOracle
(
Base
):
__abstract__
=
True
INTERN_ID_BCOM
=
Column
(
BigInteger
,
Identity
(
start
=
1
),
primary_key
=
True
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment