Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
299ea181
Commit
299ea181
authored
Jan 12, 2024
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fix bugs deleting tables when is necessary
parent
e4197850
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
14 additions
and
8 deletions
+14
-8
Mysql.py
dags/components/Databases/Mysql.py
+3
-2
Oracle.py
dags/components/Databases/Oracle.py
+4
-3
Postgres.py
dags/components/Databases/Postgres.py
+4
-3
Extractor.py
dags/components/Extractor.py
+3
-0
No files found.
dags/components/Databases/Mysql.py
View file @
299ea181
...
@@ -121,7 +121,7 @@ class Mysql:
...
@@ -121,7 +121,7 @@ class Mysql:
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
exists
=
bool
(
result
.
fetchone
())
exists
=
bool
(
result
.
fetchone
())
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo existencia de tabla. {e}
"
)
logger
.
info
(
f
"No se encuentra la tabla {table_name} en la BD interna
"
)
finally
:
finally
:
return
exists
return
exists
...
@@ -130,9 +130,10 @@ class Mysql:
...
@@ -130,9 +130,10 @@ class Mysql:
try
:
try
:
check_query
=
f
"SELECT COUNT(*) FROM {table_name}"
check_query
=
f
"SELECT COUNT(*) FROM {table_name}"
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
result
=
result
.
fetchone
()[
0
]
if
result
>
0
:
if
result
>
0
:
exists
=
True
exists
=
True
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo counts de tabla. {e}
"
)
logger
.
info
(
f
"No se encuentra la tabla {table_name} en la BD interna
"
)
finally
:
finally
:
return
exists
return
exists
dags/components/Databases/Oracle.py
View file @
299ea181
...
@@ -129,7 +129,7 @@ class Oracle:
...
@@ -129,7 +129,7 @@ class Oracle:
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
exists
=
bool
(
result
.
fetchone
())
exists
=
bool
(
result
.
fetchone
())
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo existencia de tabla. {e}
"
)
logger
.
info
(
f
"No se encuentra la tabla {table_name} en la BD interna
"
)
finally
:
finally
:
return
exists
return
exists
...
@@ -138,9 +138,10 @@ class Oracle:
...
@@ -138,9 +138,10 @@ class Oracle:
try
:
try
:
check_query
=
f
"SELECT COUNT(*) FROM {table_name.upper()}"
check_query
=
f
"SELECT COUNT(*) FROM {table_name.upper()}"
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
result
=
result
.
fetchone
()[
0
]
if
result
>
0
:
if
result
>
0
:
exists
=
True
exists
=
True
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo counts de tabla. {e}
"
)
logger
.
info
(
f
"No se encuentra la tabla {table_name} en la BD interna.
"
)
finally
:
finally
:
return
exists
return
exists
\ No newline at end of file
dags/components/Databases/Postgres.py
View file @
299ea181
...
@@ -122,7 +122,7 @@ class Postgres:
...
@@ -122,7 +122,7 @@ class Postgres:
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
exists
=
bool
(
result
.
fetchone
())
exists
=
bool
(
result
.
fetchone
())
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo existencia de tabla. {e}
"
)
logger
.
info
(
f
"No se encuentra la tabla {table_name} en la BD interna
"
)
finally
:
finally
:
return
exists
return
exists
...
@@ -131,9 +131,10 @@ class Postgres:
...
@@ -131,9 +131,10 @@ class Postgres:
try
:
try
:
check_query
=
f
"SELECT COUNT(*) FROM {table_name}"
check_query
=
f
"SELECT COUNT(*) FROM {table_name}"
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
result
=
result
.
fetchone
()[
0
]
if
result
>
0
:
if
result
>
0
:
exists
=
True
exists
=
True
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo counts de tabla. {e}
"
)
logger
.
info
(
f
"No se encuentra la tabla {table_name} en la BD interna
"
)
finally
:
finally
:
return
exists
return
exists
\ No newline at end of file
dags/components/Extractor.py
View file @
299ea181
...
@@ -90,6 +90,8 @@ def on_success_extractor(context) -> None:
...
@@ -90,6 +90,8 @@ def on_success_extractor(context) -> None:
ti
=
context
[
"ti"
]
ti
=
context
[
"ti"
]
task_name
=
f
"{ti.task_id}_{ti.map_index}"
task_name
=
f
"{ti.task_id}_{ti.map_index}"
selects
=
Variable
.
get
(
'SELECTS'
,
default_var
=
[],
deserialize_json
=
True
)
selects
=
Variable
.
get
(
'SELECTS'
,
default_var
=
[],
deserialize_json
=
True
)
if
len
(
selects
)
==
0
:
raise
AirflowSkipException
(
f
"No se encontraron operaciones de extraccion"
)
command
=
selects
[
ti
.
map_index
]
command
=
selects
[
ti
.
map_index
]
tablename
=
select_multiple
(
command
[
1
])[
"tablename"
]
tablename
=
select_multiple
(
command
[
1
])[
"tablename"
]
status
=
ProcessStatusEnum
.
SUCCESS
.
value
status
=
ProcessStatusEnum
.
SUCCESS
.
value
...
@@ -142,6 +144,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
...
@@ -142,6 +144,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
result
=
bool
(
intern_conn
.
check_table
(
tablename
,
connection
))
result
=
bool
(
intern_conn
.
check_table
(
tablename
,
connection
))
resultado
=
intern_conn
.
verify_table
(
tablename
,
connection
)
resultado
=
intern_conn
.
verify_table
(
tablename
,
connection
)
if
not
result
or
not
resultado
:
if
not
result
or
not
resultado
:
_
=
delete_table
(
tablename
,
intern_conn
)
create
=
intern_conn
.
create_table
(
model
)
create
=
intern_conn
.
create_table
(
model
)
if
create
:
if
create
:
logger
.
info
(
f
"Creado correctamente la tabla {tablename}. Creado?: {create}"
)
logger
.
info
(
f
"Creado correctamente la tabla {tablename}. Creado?: {create}"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment