Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
b4835849
Commit
b4835849
authored
Jan 11, 2024
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/developer_ev' into developer_ca
# Conflicts: # dags/procedure_prueba.json
parents
c4edffad
852ec9f6
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
112 additions
and
64 deletions
+112
-64
Cleaning.py
dags/components/Cleaning.py
+24
-11
DatabaseTransformation.py
dags/components/DatabaseOperation/DatabaseTransformation.py
+52
-19
Oracle.py
dags/components/Databases/Oracle.py
+7
-5
dag_conf.yml
dags/dag_conf.yml
+21
-21
requirements.txt
deploy-k8/requirements.txt
+8
-8
No files found.
dags/components/Cleaning.py
View file @
b4835849
...
@@ -18,17 +18,20 @@ import logging
...
@@ -18,17 +18,20 @@ import logging
logger
=
logging
.
getLogger
()
logger
=
logging
.
getLogger
()
def
validate_clean
(
control_params
:
Dict
[
str
,
Any
],
provider
:
str
,
timezone
:
str
,
engine
,
**
kwargs
)
->
None
:
def
validate_clean
(
control_params
:
Dict
[
str
,
Any
],
provider
:
str
,
timezone
:
str
,
intern_conn
,
**
kwargs
)
->
None
:
engine
=
intern_conn
.
engine
delete_task_instances
()
delete_task_instances
()
ti
=
kwargs
[
"ti"
]
ti
=
kwargs
[
"ti"
]
#created_Tables = ti.xcom_pull(task_ids="VALIDATE_GENERATOR", key="CREATED_TABLES")
procedures
=
ti
.
xcom_pull
(
task_ids
=
"MASTER_TRANSFORMATION"
,
key
=
"PROC_CREATED"
)
procedures
=
ti
.
xcom_pull
(
task_ids
=
"MASTER_TRANSFORMATION"
,
key
=
"PROC_CREATED"
)
if
procedures
:
if
procedures
:
for
procedure
in
procedures
:
for
procedure
in
procedures
:
logger
.
info
(
f
"Borrando procedures {procedure}"
)
with
engine
.
connect
()
as
conn
:
delete
=
delete_procedure
(
procedure
,
engine
)
result_p
=
bool
(
intern_conn
.
check_procedure
(
procedure
,
conn
))
if
delete
:
if
result_p
:
logger
.
info
(
f
"Borrado correctamente el procedure {procedure}"
)
logger
.
info
(
f
"Borrando procedures {procedure}"
)
delete
=
delete_procedure
(
procedure
,
engine
)
if
delete
:
logger
.
info
(
f
"Borrado correctamente el procedure {procedure}"
)
success_tasks
=
ti
.
xcom_pull
(
task_ids
=
"GENERATORS"
,
key
=
"SUCCESS_TASKS"
)
success_tasks
=
ti
.
xcom_pull
(
task_ids
=
"GENERATORS"
,
key
=
"SUCCESS_TASKS"
)
failed_tasks
=
ti
.
xcom_pull
(
task_ids
=
"GENERATORS"
,
key
=
"FAILED_TASKS"
)
failed_tasks
=
ti
.
xcom_pull
(
task_ids
=
"GENERATORS"
,
key
=
"FAILED_TASKS"
)
conf
=
ti
.
xcom_pull
(
task_ids
=
"VALIDATE_GENERATOR"
,
key
=
"CONTROL-CONFIG"
)
conf
=
ti
.
xcom_pull
(
task_ids
=
"VALIDATE_GENERATOR"
,
key
=
"CONTROL-CONFIG"
)
...
@@ -60,12 +63,22 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
...
@@ -60,12 +63,22 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
def
clean
(
command
:
str
,
intern_conn
):
def
clean
(
command
:
str
,
intern_conn
):
engine
=
intern_conn
.
engine
engine
=
intern_conn
.
engine
#tablename = select_multiple(command)["tablename"]
tablename
=
command
tablename
=
command
logger
.
info
(
f
"Borrando tabla {tablename}"
)
# with engine.connect() as conn:
delete
=
delete_table
(
tablename
,
engine
)
# result_t = bool(intern_conn.check_table(tablename, conn))
if
delete
:
# result_p = bool(intern_conn.check_procedure(tablename, conn))
# if result_t:
# logger.info(f"Borrando tabla {tablename}")
# delete_t = delete_table(tablename, engine)
# elif result_p:
# logger.info(f"Borrando procedure {tablename}")
# delete_p = delete_procedure(tablename, engine)
delete_p
=
delete_procedure
(
tablename
,
engine
)
delete_t
=
delete_table
(
tablename
,
engine
)
if
delete_t
:
logger
.
info
(
f
"Borrado correctamente la tabla {tablename}"
)
logger
.
info
(
f
"Borrado correctamente la tabla {tablename}"
)
elif
delete_p
:
logger
.
info
(
f
"Borrado correctamente el procedure {tablename}"
)
logger
.
info
(
f
"Borrado todas las variables xcom"
)
logger
.
info
(
f
"Borrado todas las variables xcom"
)
...
@@ -110,7 +123,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider
...
@@ -110,7 +123,7 @@ def get_cleaning_task_group(db_intern_conn, control_s3: Dict[str, Any], provider
validate_task
=
PythonOperator
(
validate_task
=
PythonOperator
(
task_id
=
"VALIDATE_CLEANER"
,
task_id
=
"VALIDATE_CLEANER"
,
python_callable
=
validate_clean
,
python_callable
=
validate_clean
,
op_kwargs
=
{
'control_params'
:
control_s3
,
'provider'
:
provider
,
'timezone'
:
timezone
,
'
engine'
:
db_intern_conn
.
engine
},
op_kwargs
=
{
'control_params'
:
control_s3
,
'provider'
:
provider
,
'timezone'
:
timezone
,
'
intern_conn'
:
db_intern_conn
},
trigger_rule
=
'none_skipped'
trigger_rule
=
'none_skipped'
)
)
cleaners
>>
tasks
>>
validate_task
cleaners
>>
tasks
>>
validate_task
...
...
dags/components/DatabaseOperation/DatabaseTransformation.py
View file @
b4835849
import
logging
import
logging
import
time
import
time
from
typing
import
List
from
typing
import
List
import
re
logger
=
logging
.
getLogger
()
logger
=
logging
.
getLogger
()
...
@@ -16,17 +16,33 @@ def execute_transformations(commands: List[str], engine):
...
@@ -16,17 +16,33 @@ def execute_transformations(commands: List[str], engine):
def
delete_table
(
tablename
:
str
,
engine
)
->
bool
:
def
delete_table
(
tablename
:
str
,
engine
)
->
bool
:
base_Datos
=
engine
.
dialect
.
name
delete
=
False
delete
=
False
try
:
try
:
command
=
f
'DROP TABLE IF EXISTS {tablename}'
if
base_Datos
==
"oracle"
:
start_time
=
time
.
time
()
command
=
f
'DROP TABLE {tablename}'
with
engine
.
connect
()
as
conn
:
start_time
=
time
.
time
()
try
:
with
engine
.
connect
()
as
conn
:
_
=
conn
.
execute
(
command
)
try
:
except
Exception
as
e
:
check_query
=
f
"SELECT table_name FROM all_tables WHERE table_name = '{tablename.upper()}'"
logger
.
error
(
f
"Tabla no encontrada. {e}"
)
result
=
conn
.
execute
(
check_query
)
delete
=
True
exists
=
bool
(
result
.
fetchone
())
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
except
Exception
as
e
:
logger
.
error
(
f
"Tabla no encontrada. {e}"
)
if
exists
:
_
=
conn
.
execute
(
command
)
delete
=
True
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
elif
base_Datos
==
"mysql"
:
command
=
f
'DROP TABLE IF EXISTS {tablename}'
start_time
=
time
.
time
()
with
engine
.
connect
()
as
conn
:
try
:
_
=
conn
.
execute
(
command
)
delete
=
True
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
except
Exception
as
e
:
logger
.
error
(
f
"Tabla no encontrada. {e}"
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error borrando tabla {tablename}. {e}"
)
logger
.
error
(
f
"Error borrando tabla {tablename}. {e}"
)
finally
:
finally
:
...
@@ -34,17 +50,34 @@ def delete_table(tablename: str, engine) -> bool:
...
@@ -34,17 +50,34 @@ def delete_table(tablename: str, engine) -> bool:
def
delete_procedure
(
procedure
:
str
,
engine
)
->
bool
:
def
delete_procedure
(
procedure
:
str
,
engine
)
->
bool
:
base_Datos
=
engine
.
dialect
.
name
delete
=
False
delete
=
False
try
:
try
:
command
=
f
"DROP PROCEDURE IF EXISTS {procedure}"
if
base_Datos
==
"oracle"
:
start_time
=
time
.
time
()
command
=
f
"DROP PROCEDURE {procedure}"
with
engine
.
connect
()
as
conn
:
start_time
=
time
.
time
()
try
:
with
engine
.
connect
()
as
conn
:
_
=
conn
.
execute
(
command
)
try
:
except
Exception
as
e
:
proc
=
re
.
match
(
r'([^(]+)'
,
procedure
)
.
group
(
1
)
logger
.
error
(
f
"Procedure no encontrado. {e}"
)
check_query
=
f
"SELECT object_name FROM all_procedures WHERE object_name = '{proc.upper()}'"
delete
=
True
result
=
conn
.
execute
(
check_query
)
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
exists
=
bool
(
result
.
fetchone
())
except
Exception
as
e
:
logger
.
error
(
f
"Procedure no encontrado. {e}"
)
if
exists
:
_
=
conn
.
execute
(
command
)
delete
=
True
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
elif
base_Datos
==
"mysql"
:
command
=
f
'DROP PROCEDURE IF EXISTS {procedure}'
start_time
=
time
.
time
()
with
engine
.
connect
()
as
conn
:
try
:
_
=
conn
.
execute
(
command
)
delete
=
True
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
except
Exception
as
e
:
logger
.
error
(
f
"Procedure no encontrada. {e}"
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error borrando procedure {procedure}. {e}"
)
logger
.
error
(
f
"Error borrando procedure {procedure}. {e}"
)
finally
:
finally
:
...
...
dags/components/Databases/Oracle.py
View file @
b4835849
...
@@ -2,7 +2,7 @@ from typing import List, Tuple
...
@@ -2,7 +2,7 @@ from typing import List, Tuple
from
sqlalchemy
import
create_engine
from
sqlalchemy
import
create_engine
import
oracledb
import
oracledb
import
re
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseDialectEnum
import
DatabaseDialectEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.Databases.Enums.OracleDataTypeEnum
import
OracleDataTypeEnum
from
components.Databases.Enums.OracleDataTypeEnum
import
OracleDataTypeEnum
...
@@ -90,8 +90,9 @@ class Oracle:
...
@@ -90,8 +90,9 @@ class Oracle:
def
generate_sql_procedure
(
self
,
command
:
str
,
reserved_word
:
str
=
"execute"
)
->
str
:
def
generate_sql_procedure
(
self
,
command
:
str
,
reserved_word
:
str
=
"execute"
)
->
str
:
response
=
""
response
=
""
try
:
try
:
logger
.
info
(
"COMANDO"
,
command
)
command
=
command
.
replace
(
reserved_word
,
""
)
.
replace
(
";"
,
""
)
command
=
command
.
replace
(
reserved_word
,
""
)
.
replace
(
";"
,
""
)
response
=
f
"
begin {command}; end
;"
response
=
f
"
BEGIN {command}; END
;"
logger
.
debug
(
"COMANDO ORACLE:"
,
response
)
logger
.
debug
(
"COMANDO ORACLE:"
,
response
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error generando comando sql para procedure Oracle. Comando: {command}. {e}"
)
logger
.
error
(
f
"Error generando comando sql para procedure Oracle. Comando: {command}. {e}"
)
...
@@ -113,7 +114,8 @@ class Oracle:
...
@@ -113,7 +114,8 @@ class Oracle:
def
check_procedure
(
self
,
procedure_name
,
connection
)
->
bool
:
def
check_procedure
(
self
,
procedure_name
,
connection
)
->
bool
:
exists
=
False
exists
=
False
try
:
try
:
check_query
=
f
"SELECT text FROM all_source WHERE name = '{procedure_name}'"
procedure_name
=
re
.
match
(
r'([^(]+)'
,
procedure_name
)
.
group
(
1
)
check_query
=
f
"SELECT object_name FROM all_procedures WHERE object_name = '{procedure_name.upper()}'"
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
exists
=
bool
(
result
.
fetchone
())
exists
=
bool
(
result
.
fetchone
())
except
Exception
as
e
:
except
Exception
as
e
:
...
@@ -124,7 +126,7 @@ class Oracle:
...
@@ -124,7 +126,7 @@ class Oracle:
def
check_table
(
self
,
table_name
,
connection
)
->
bool
:
def
check_table
(
self
,
table_name
,
connection
)
->
bool
:
exists
=
False
exists
=
False
try
:
try
:
check_query
=
f
"
DESCRIBE {table_name}
"
check_query
=
f
"
SELECT table_name FROM all_tables WHERE table_name = '{table_name.upper()}'
"
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
exists
=
bool
(
result
.
fetchone
())
exists
=
bool
(
result
.
fetchone
())
except
Exception
as
e
:
except
Exception
as
e
:
...
@@ -135,7 +137,7 @@ class Oracle:
...
@@ -135,7 +137,7 @@ class Oracle:
def
verify_table
(
self
,
table_name
,
connection
)
->
bool
:
def
verify_table
(
self
,
table_name
,
connection
)
->
bool
:
exists
=
False
exists
=
False
try
:
try
:
check_query
=
f
"SELECT COUNT(*) FROM {table_name}"
check_query
=
f
"SELECT COUNT(*) FROM {table_name
.upper()
}"
result
=
connection
.
execute
(
check_query
)
result
=
connection
.
execute
(
check_query
)
if
result
>
0
:
if
result
>
0
:
exists
=
True
exists
=
True
...
...
dags/dag_conf.yml
View file @
b4835849
...
@@ -5,23 +5,23 @@ app:
...
@@ -5,23 +5,23 @@ app:
database
:
database
:
sources
:
sources
:
source1
:
source1
:
type
:
mysql
type
:
oracle
host
:
192.168.
1.13
host
:
192.168.
27.22
port
:
13306
port
:
21521
username
:
root
username
:
PRUEBABCOM2
password
:
root
password
:
admin
database
:
prueba
database
:
service
:
service
:
ORCLPDB1
schema
:
sources
schema
:
transformation
:
transformation
:
type
:
mysql
type
:
oracle
host
:
192.168.
1.13
host
:
192.168.
27.22
port
:
13306
port
:
21521
username
:
root
username
:
RLQA_AIR
password
:
root
password
:
RLQA_AIR99
database
:
prueba_ca
database
:
service
:
service
:
ORCLPDB1
schema
:
intern_db
schema
:
chunksize
:
4000
chunksize
:
4000
label_multiple_select
:
TABLENAME
label_multiple_select
:
TABLENAME
label_transform_procedure
:
STORE
label_transform_procedure
:
STORE
...
@@ -55,14 +55,14 @@ app:
...
@@ -55,14 +55,14 @@ app:
delimiter
:
'
|'
delimiter
:
'
|'
tmp_path
:
/tmp
tmp_path
:
/tmp
s3_params
:
s3_params
:
tabla
4
:
tabla
1
:
bucket
:
prueba
irflow
bucket
:
prueba
-id
prefix
:
bcom_results
prefix
:
bcom_results
connection_id
:
prueba_af
connection_id
:
conn_script
tabla5
:
tabla5
:
bucket
:
prueba
irflow
bucket
:
prueba
-id
prefix
:
bcom_results
prefix
:
bcom_results
connection_id
:
prueba_af
connection_id
:
conn_script
report
:
report
:
s3_params
:
s3_params
:
bucket
:
prueba-id
bucket
:
prueba-id
...
...
deploy-k8/requirements.txt
View file @
b4835849
openpyxl==3.1.2
pip install openpyxl==3.1.2
XlsxWriter==3.1.2
pip install XlsxWriter==3.1.2
pymysql==1.1.0
pip install pymysql==1.1.0
oracledb==1.3.2
pip install oracledb==1.3.2
apache-airflow-providers-google
pip install apache-airflow-providers-google
apache-airflow-providers-amazon
pip install apache-airflow-providers-amazon
apache-airflow-providers-postgres
pip install apache-airflow-providers-postgres
apache-airflow-providers-oracle
pip install apache-airflow-providers-oracle
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment