Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
852ec9f6
Commit
852ec9f6
authored
Jan 11, 2024
by
Erly Villaroel
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Cambios en Cleaning.py
parent
1b1d0f08
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
16 additions
and
15 deletions
+16
-15
Cleaning.py
dags/components/Cleaning.py
+11
-11
DatabaseTransformation.py
dags/components/DatabaseOperation/DatabaseTransformation.py
+2
-2
Oracle.py
dags/components/Databases/Oracle.py
+3
-2
No files found.
dags/components/Cleaning.py
View file @
852ec9f6
...
@@ -62,19 +62,19 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
...
@@ -62,19 +62,19 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
def
clean
(
command
:
str
,
intern_conn
):
def
clean
(
command
:
str
,
intern_conn
):
delete_t
=
False
delete_p
=
False
engine
=
intern_conn
.
engine
engine
=
intern_conn
.
engine
tablename
=
command
tablename
=
command
with
engine
.
connect
()
as
conn
:
# with engine.connect() as conn:
result_t
=
bool
(
intern_conn
.
check_table
(
tablename
,
conn
))
# result_t = bool(intern_conn.check_table(tablename, conn))
result_p
=
bool
(
intern_conn
.
check_procedure
(
tablename
,
conn
))
# result_p = bool(intern_conn.check_procedure(tablename, conn))
if
result_t
:
# if result_t:
logger
.
info
(
f
"Borrando tabla {tablename}"
)
# logger.info(f"Borrando tabla {tablename}")
delete_t
=
delete_table
(
tablename
,
engine
)
# delete_t = delete_table(tablename, engine)
elif
result_p
:
# elif result_p:
logger
.
info
(
f
"Borrando procedure {tablename}"
)
# logger.info(f"Borrando procedure {tablename}")
delete_p
=
delete_procedure
(
tablename
,
engine
)
# delete_p = delete_procedure(tablename, engine)
delete_p
=
delete_procedure
(
tablename
,
engine
)
delete_t
=
delete_table
(
tablename
,
engine
)
if
delete_t
:
if
delete_t
:
logger
.
info
(
f
"Borrado correctamente la tabla {tablename}"
)
logger
.
info
(
f
"Borrado correctamente la tabla {tablename}"
)
elif
delete_p
:
elif
delete_p
:
...
...
dags/components/DatabaseOperation/DatabaseTransformation.py
View file @
852ec9f6
import
logging
import
logging
import
time
import
time
from
typing
import
List
from
typing
import
List
import
re
logger
=
logging
.
getLogger
()
logger
=
logging
.
getLogger
()
...
@@ -58,7 +58,7 @@ def delete_procedure(procedure: str, engine) -> bool:
...
@@ -58,7 +58,7 @@ def delete_procedure(procedure: str, engine) -> bool:
start_time
=
time
.
time
()
start_time
=
time
.
time
()
with
engine
.
connect
()
as
conn
:
with
engine
.
connect
()
as
conn
:
try
:
try
:
proc
=
procedure
.
replace
(
"("
,
""
)
.
replace
(
")"
,
""
)
proc
=
re
.
match
(
r'([^(]+)'
,
procedure
)
.
group
(
1
)
check_query
=
f
"SELECT object_name FROM all_procedures WHERE object_name = '{proc.upper()}'"
check_query
=
f
"SELECT object_name FROM all_procedures WHERE object_name = '{proc.upper()}'"
result
=
conn
.
execute
(
check_query
)
result
=
conn
.
execute
(
check_query
)
exists
=
bool
(
result
.
fetchone
())
exists
=
bool
(
result
.
fetchone
())
...
...
dags/components/Databases/Oracle.py
View file @
852ec9f6
...
@@ -2,7 +2,7 @@ from typing import List, Tuple
...
@@ -2,7 +2,7 @@ from typing import List, Tuple
from
sqlalchemy
import
create_engine
from
sqlalchemy
import
create_engine
import
oracledb
import
oracledb
import
re
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.Databases.Enums.OracleDataTypeEnum
import
OracleDataTypeEnum
from
components.Databases.Enums.OracleDataTypeEnum
import
OracleDataTypeEnum
...
@@ -90,6 +90,7 @@ class Oracle:
...
@@ -90,6 +90,7 @@ class Oracle:
def
generate_sql_procedure
(
self
,
command
:
str
,
reserved_word
:
str
=
"execute"
)
->
str
:
def
generate_sql_procedure
(
self
,
command
:
str
,
reserved_word
:
str
=
"execute"
)
->
str
:
response
=
""
response
=
""
try
:
try
:
logger
.
info
(
"COMANDO"
,
command
)
command
=
command
.
replace
(
reserved_word
,
""
)
.
replace
(
";"
,
""
)
command
=
command
.
replace
(
reserved_word
,
""
)
.
replace
(
";"
,
""
)
response
=
f
"BEGIN {command}; END;"
response
=
f
"BEGIN {command}; END;"
logger
.
debug
(
"COMANDO ORACLE:"
,
response
)
logger
.
debug
(
"COMANDO ORACLE:"
,
response
)
...
@@ -113,7 +114,7 @@ class Oracle:
...
@@ -113,7 +114,7 @@ class Oracle:
def
check_procedure
(
self
,
procedure_name
,
connection
)
->
bool
:
def
check_procedure
(
self
,
procedure_name
,
connection
)
->
bool
:
exists
=
False
exists
=
False
try
:
try
:
procedure_name
=
procedure_name
.
replace
(
"("
,
""
)
.
replace
(
")"
,
""
)
procedure_name
=
re
.
match
(
r'([^(]+)'
,
procedure_name
)
.
group
(
1
)
check_query
=
f
"SELECT object_name FROM all_procedures WHERE object_name = '{procedure_name.upper()}'"
check_query
=
f
"SELECT object_name FROM all_procedures WHERE object_name = '{procedure_name.upper()}'"
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
exists
=
bool
(
result
.
fetchone
())
exists
=
bool
(
result
.
fetchone
())
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment