Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
368d03ad
Commit
368d03ad
authored
Jan 11, 2024
by
Erly Villaroel
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Cambios en Cleaning.py
parent
ad679d5e
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
91 additions
and
40 deletions
+91
-40
Cleaning.py
dags/components/Cleaning.py
+27
-11
DatabaseTransformation.py
dags/components/DatabaseOperation/DatabaseTransformation.py
+51
-18
Oracle.py
dags/components/Databases/Oracle.py
+5
-3
requirements.txt
deploy-k8/requirements.txt
+8
-8
No files found.
dags/components/Cleaning.py
View file @
368d03ad
...
@@ -18,17 +18,20 @@ import logging
...
@@ -18,17 +18,20 @@ import logging
logger
=
logging
.
getLogger
()
logger
=
logging
.
getLogger
()
def
validate_clean
(
control_params
:
Dict
[
str
,
Any
],
provider
:
str
,
timezone
:
str
,
engine
,
**
kwargs
)
->
None
:
def
validate_clean
(
control_params
:
Dict
[
str
,
Any
],
provider
:
str
,
timezone
:
str
,
intern_conn
,
**
kwargs
)
->
None
:
engine
=
intern_conn
.
engine
delete_task_instances
()
delete_task_instances
()
ti
=
kwargs
[
"ti"
]
ti
=
kwargs
[
"ti"
]
#created_Tables = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CREATED_TABLES")
procedures
=
ti
.
xcom_pull
(
task_ids
=
"MASTER_TRANSFORMATION"
,
key
=
"PROC_CREATED"
)
procedures
=
ti
.
xcom_pull
(
task_ids
=
"MASTER_TRANSFORMATION"
,
key
=
"PROC_CREATED"
)
if
procedures
:
if
procedures
:
for
procedure
in
procedures
:
for
procedure
in
procedures
:
logger
.
info
(
f
"Borrando procedures {procedure}"
)
with
engine
.
connect
()
as
conn
:
delete
=
delete_procedure
(
procedure
,
engine
)
result_p
=
bool
(
intern_conn
.
check_procedure
(
procedure
,
conn
))
if
delete
:
if
result_p
:
logger
.
info
(
f
"Borrado correctamente el procedure {procedure}"
)
logger
.
info
(
f
"Borrando procedures {procedure}"
)
delete
=
delete_procedure
(
procedure
,
engine
)
if
delete
:
logger
.
info
(
f
"Borrado correctamente el procedure {procedure}"
)
success_tasks
=
ti
.
xcom_pull
(
task_ids
=
"GENERATORS"
,
key
=
"SUCCESS_TASKS"
)
success_tasks
=
ti
.
xcom_pull
(
task_ids
=
"GENERATORS"
,
key
=
"SUCCESS_TASKS"
)
failed_tasks
=
ti
.
xcom_pull
(
task_ids
=
"GENERATORS"
,
key
=
"FAILED_TASKS"
)
failed_tasks
=
ti
.
xcom_pull
(
task_ids
=
"GENERATORS"
,
key
=
"FAILED_TASKS"
)
conf
=
ti
.
xcom_pull
(
task_ids
=
"VALIDATE_GENERATOR"
,
key
=
"CONTROL-CONFIG"
)
conf
=
ti
.
xcom_pull
(
task_ids
=
"VALIDATE_GENERATOR"
,
key
=
"CONTROL-CONFIG"
)
...
@@ -59,13 +62,26 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
...
@@ -59,13 +62,26 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
def
clean
(
command
:
str
,
intern_conn
):
def
clean
(
command
:
str
,
intern_conn
):
delete_t
=
False
delete_p
=
False
engine
=
intern_conn
.
engine
engine
=
intern_conn
.
engine
#tablename = select_multiple(command)["tablename"]
tablename
=
command
tablename
=
command
logger
.
info
(
f
"Borrando tabla {tablename}"
)
print
(
tablename
)
delete
=
delete_table
(
tablename
,
engine
)
with
engine
.
connect
()
as
conn
:
if
delete
:
result_t
=
bool
(
intern_conn
.
check_table
(
tablename
,
conn
))
result_p
=
bool
(
intern_conn
.
check_procedure
(
tablename
,
conn
))
print
(
"result_t"
,
result_t
)
print
(
"result_t"
,
result_p
)
if
result_t
:
logger
.
info
(
f
"Borrando tabla {tablename}"
)
delete_t
=
delete_table
(
tablename
,
engine
)
elif
result_p
:
logger
.
info
(
f
"Borrando procedure {tablename}"
)
delete_p
=
delete_procedure
(
tablename
,
engine
)
if
delete_t
:
logger
.
info
(
f
"Borrado correctamente la tabla {tablename}"
)
logger
.
info
(
f
"Borrado correctamente la tabla {tablename}"
)
elif
delete_p
:
logger
.
info
(
f
"Borrado correctamente el procedure {tablename}"
)
logger
.
info
(
f
"Borrado todas las variables xcom"
)
logger
.
info
(
f
"Borrado todas las variables xcom"
)
...
@@ -110,7 +126,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider
...
@@ -110,7 +126,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider
validate_task
=
PythonOperator
(
validate_task
=
PythonOperator
(
task_id
=
"VALIDATE_CLEANER"
,
task_id
=
"VALIDATE_CLEANER"
,
python_callable
=
validate_clean
,
python_callable
=
validate_clean
,
op_kwargs
=
{
'control_params'
:
control_s3
,
'provider'
:
provider
,
'timezone'
:
timezone
,
'
engine'
:
db_intern_conn
.
engine
},
op_kwargs
=
{
'control_params'
:
control_s3
,
'provider'
:
provider
,
'timezone'
:
timezone
,
'
intern_conn'
:
db_intern_conn
},
trigger_rule
=
'none_skipped'
trigger_rule
=
'none_skipped'
)
)
cleaners
>>
tasks
>>
validate_task
cleaners
>>
tasks
>>
validate_task
...
...
dags/components/DatabaseOperation/DatabaseTransformation.py
View file @
368d03ad
...
@@ -16,17 +16,33 @@ def execute_transformations(commands: List[str], engine):
...
@@ -16,17 +16,33 @@ def execute_transformations(commands: List[str], engine):
def
delete_table
(
tablename
:
str
,
engine
)
->
bool
:
def
delete_table
(
tablename
:
str
,
engine
)
->
bool
:
base_Datos
=
engine
.
dialect
.
name
delete
=
False
delete
=
False
try
:
try
:
command
=
f
'DROP TABLE {tablename}'
if
base_Datos
==
"oracle"
:
start_time
=
time
.
time
()
command
=
f
'DROP TABLE {tablename}'
with
engine
.
connect
()
as
conn
:
start_time
=
time
.
time
()
try
:
with
engine
.
connect
()
as
conn
:
_
=
conn
.
execute
(
command
)
try
:
except
Exception
as
e
:
check_query
=
f
"SELECT table_name FROM all_tables WHERE table_name = '{tablename.upper()}'"
logger
.
error
(
f
"Tabla no encontrada. {e}"
)
result
=
conn
.
execute
(
check_query
)
delete
=
True
exists
=
bool
(
result
.
fetchone
())
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
except
Exception
as
e
:
logger
.
error
(
f
"Tabla no encontrada. {e}"
)
if
exists
:
_
=
conn
.
execute
(
command
)
delete
=
True
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
elif
base_Datos
==
"mysql"
:
command
=
f
'DROP TABLE IF EXISTS {tablename}'
start_time
=
time
.
time
()
with
engine
.
connect
()
as
conn
:
try
:
_
=
conn
.
execute
(
command
)
delete
=
True
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
except
Exception
as
e
:
logger
.
error
(
f
"Tabla no encontrada. {e}"
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error borrando tabla {tablename}. {e}"
)
logger
.
error
(
f
"Error borrando tabla {tablename}. {e}"
)
finally
:
finally
:
...
@@ -34,17 +50,34 @@ def delete_table(tablename: str, engine) -> bool:
...
@@ -34,17 +50,34 @@ def delete_table(tablename: str, engine) -> bool:
def
delete_procedure
(
procedure
:
str
,
engine
)
->
bool
:
def
delete_procedure
(
procedure
:
str
,
engine
)
->
bool
:
base_Datos
=
engine
.
dialect
.
name
delete
=
False
delete
=
False
try
:
try
:
command
=
f
"DROP PROCEDURE {procedure}"
if
base_Datos
==
"oracle"
:
start_time
=
time
.
time
()
command
=
f
"DROP PROCEDURE {procedure}"
with
engine
.
connect
()
as
conn
:
start_time
=
time
.
time
()
try
:
with
engine
.
connect
()
as
conn
:
_
=
conn
.
execute
(
command
)
try
:
except
Exception
as
e
:
proc
=
procedure
.
replace
(
"("
,
""
)
.
replace
(
")"
,
""
)
logger
.
error
(
f
"Procedure no encontrado. {e}"
)
check_query
=
f
"SELECT object_name FROM all_procedures WHERE object_name = '{proc.upper()}'"
delete
=
True
result
=
conn
.
execute
(
check_query
)
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
exists
=
bool
(
result
.
fetchone
())
except
Exception
as
e
:
logger
.
error
(
f
"Procedure no encontrado. {e}"
)
if
exists
:
_
=
conn
.
execute
(
command
)
delete
=
True
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
elif
base_Datos
==
"mysql"
:
command
=
f
'DROP PROCEDURE IF EXISTS {procedure}'
start_time
=
time
.
time
()
with
engine
.
connect
()
as
conn
:
try
:
_
=
conn
.
execute
(
command
)
delete
=
True
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
except
Exception
as
e
:
logger
.
error
(
f
"Procedure no encontrada. {e}"
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error borrando procedure {procedure}. {e}"
)
logger
.
error
(
f
"Error borrando procedure {procedure}. {e}"
)
finally
:
finally
:
...
...
dags/components/Databases/Oracle.py
View file @
368d03ad
...
@@ -113,7 +113,8 @@ class Oracle:
...
@@ -113,7 +113,8 @@ class Oracle:
def
check_procedure
(
self
,
procedure_name
,
connection
)
->
bool
:
def
check_procedure
(
self
,
procedure_name
,
connection
)
->
bool
:
exists
=
False
exists
=
False
try
:
try
:
check_query
=
f
"SELECT object_name FROM all_procedures WHERE object_name = '{procedure_name}'"
procedure_name
=
procedure_name
.
replace
(
"("
,
""
)
.
replace
(
")"
,
""
)
check_query
=
f
"SELECT object_name FROM all_procedures WHERE object_name = '{procedure_name.upper()}'"
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
exists
=
bool
(
result
.
fetchone
())
exists
=
bool
(
result
.
fetchone
())
except
Exception
as
e
:
except
Exception
as
e
:
...
@@ -124,18 +125,19 @@ class Oracle:
...
@@ -124,18 +125,19 @@ class Oracle:
def
check_table
(
self
,
table_name
,
connection
)
->
bool
:
def
check_table
(
self
,
table_name
,
connection
)
->
bool
:
exists
=
False
exists
=
False
try
:
try
:
check_query
=
f
"SELECT table_name FROM all_tables WHERE table_name = '{table_name}'"
check_query
=
f
"SELECT table_name FROM all_tables WHERE table_name = '{table_name
.upper()
}'"
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
exists
=
bool
(
result
.
fetchone
())
exists
=
bool
(
result
.
fetchone
())
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo existencia de tabla. {e}"
)
logger
.
error
(
f
"Error obteniendo existencia de tabla. {e}"
)
finally
:
finally
:
print
(
"exists"
,
exists
)
return
exists
return
exists
def
verify_table
(
self
,
table_name
,
connection
)
->
bool
:
def
verify_table
(
self
,
table_name
,
connection
)
->
bool
:
exists
=
False
exists
=
False
try
:
try
:
check_query
=
f
"SELECT COUNT(*) FROM {table_name}"
check_query
=
f
"SELECT COUNT(*) FROM {table_name
.upper()
}"
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
if
result
>
0
:
if
result
>
0
:
exists
=
True
exists
=
True
...
...
deploy-k8/requirements.txt
View file @
368d03ad
openpyxl==3.1.2
pip install openpyxl==3.1.2
XlsxWriter==3.1.2
pip install XlsxWriter==3.1.2
pymysql==1.1.0
pip install pymysql==1.1.0
oracledb==1.3.2
pip install oracledb==1.3.2
apache-airflow-providers-google
pip install apache-airflow-providers-google
apache-airflow-providers-amazon
pip install apache-airflow-providers-amazon
apache-airflow-providers-postgres
pip install apache-airflow-providers-postgres
apache-airflow-providers-oracle
pip install apache-airflow-providers-oracle
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment