Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
e4197850
Commit
e4197850
authored
Jan 11, 2024
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fix bugs deleting tables
parent
b4835849
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
20 additions
and
49 deletions
+20
-49
Cleaning.py
dags/components/Cleaning.py
+3
-14
DatabaseLoad.py
dags/components/DatabaseOperation/DatabaseLoad.py
+1
-1
DatabaseTransformation.py
dags/components/DatabaseOperation/DatabaseTransformation.py
+11
-8
Mysql.py
dags/components/Databases/Mysql.py
+1
-1
Oracle.py
dags/components/Databases/Oracle.py
+0
-1
Extractor.py
dags/components/Extractor.py
+3
-6
Transformation.py
dags/components/Transformation.py
+1
-3
Utils.py
dags/components/Utils.py
+0
-15
No files found.
dags/components/Cleaning.py
View file @
e4197850
...
@@ -29,7 +29,7 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
...
@@ -29,7 +29,7 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
result_p
=
bool
(
intern_conn
.
check_procedure
(
procedure
,
conn
))
result_p
=
bool
(
intern_conn
.
check_procedure
(
procedure
,
conn
))
if
result_p
:
if
result_p
:
logger
.
info
(
f
"Borrando procedures {procedure}"
)
logger
.
info
(
f
"Borrando procedures {procedure}"
)
delete
=
delete_procedure
(
procedure
,
engine
)
delete
=
delete_procedure
(
procedure
,
intern_conn
)
if
delete
:
if
delete
:
logger
.
info
(
f
"Borrado correctamente el procedure {procedure}"
)
logger
.
info
(
f
"Borrado correctamente el procedure {procedure}"
)
success_tasks
=
ti
.
xcom_pull
(
task_ids
=
"GENERATORS"
,
key
=
"SUCCESS_TASKS"
)
success_tasks
=
ti
.
xcom_pull
(
task_ids
=
"GENERATORS"
,
key
=
"SUCCESS_TASKS"
)
...
@@ -41,7 +41,6 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
...
@@ -41,7 +41,6 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
if
not
prefix
.
endswith
(
"/"
):
if
not
prefix
.
endswith
(
"/"
):
prefix
+=
"/"
prefix
+=
"/"
key
=
prefix
+
control_params
[
"filename"
]
key
=
prefix
+
control_params
[
"filename"
]
#conf = json.dumps(conf, indent=2, default=str)
final_dict
=
{}
final_dict
=
{}
status
=
ProcessStatusEnum
.
SUCCESS
.
value
status
=
ProcessStatusEnum
.
SUCCESS
.
value
if
not
isinstance
(
success_tasks
,
type
(
None
))
and
len
(
success_tasks
)
>
0
:
if
not
isinstance
(
success_tasks
,
type
(
None
))
and
len
(
success_tasks
)
>
0
:
...
@@ -62,19 +61,9 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
...
@@ -62,19 +61,9 @@ def validate_clean(control_params: Dict[str, Any], provider: str, timezone: str,
def
clean
(
command
:
str
,
intern_conn
):
def
clean
(
command
:
str
,
intern_conn
):
engine
=
intern_conn
.
engine
tablename
=
command
tablename
=
command
# with engine.connect() as conn:
delete_p
=
delete_procedure
(
tablename
,
intern_conn
)
# result_t = bool(intern_conn.check_table(tablename, conn))
delete_t
=
delete_table
(
tablename
,
intern_conn
)
# result_p = bool(intern_conn.check_procedure(tablename, conn))
# if result_t:
# logger.info(f"Borrando tabla {tablename}")
# delete_t = delete_table(tablename, engine)
# elif result_p:
# logger.info(f"Borrando procedure {tablename}")
# delete_p = delete_procedure(tablename, engine)
delete_p
=
delete_procedure
(
tablename
,
engine
)
delete_t
=
delete_table
(
tablename
,
engine
)
if
delete_t
:
if
delete_t
:
logger
.
info
(
f
"Borrado correctamente la tabla {tablename}"
)
logger
.
info
(
f
"Borrado correctamente la tabla {tablename}"
)
elif
delete_p
:
elif
delete_p
:
...
...
dags/components/DatabaseOperation/DatabaseLoad.py
View file @
e4197850
...
@@ -7,7 +7,7 @@ import logging
...
@@ -7,7 +7,7 @@ import logging
logger
=
logging
.
getLogger
()
logger
=
logging
.
getLogger
()
def
save_from_dataframe
(
df
:
pd
.
DataFrame
,
tablename
:
str
,
connection
)
->
bool
:
def
save_from_dataframe
(
df
:
pd
.
DataFrame
,
tablename
:
str
,
connection
,
is_first
:
bool
=
False
)
->
bool
:
save
=
False
save
=
False
try
:
try
:
chunksize
=
2000
chunksize
=
2000
...
...
dags/components/DatabaseOperation/DatabaseTransformation.py
View file @
e4197850
...
@@ -2,6 +2,9 @@ import logging
...
@@ -2,6 +2,9 @@ import logging
import
time
import
time
from
typing
import
List
from
typing
import
List
import
re
import
re
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
logger
=
logging
.
getLogger
()
logger
=
logging
.
getLogger
()
...
@@ -15,11 +18,11 @@ def execute_transformations(commands: List[str], engine):
...
@@ -15,11 +18,11 @@ def execute_transformations(commands: List[str], engine):
logger
.
error
(
f
"Error ejecutando comando de transformación. {e}"
)
logger
.
error
(
f
"Error ejecutando comando de transformación. {e}"
)
def
delete_table
(
tablename
:
str
,
engine
)
->
bool
:
def
delete_table
(
tablename
:
str
,
connection
)
->
bool
:
base_Datos
=
engine
.
dialect
.
nam
e
engine
=
connection
.
engin
e
delete
=
False
delete
=
False
try
:
try
:
if
base_Datos
==
"oracle"
:
if
connection
.
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
:
command
=
f
'DROP TABLE {tablename}'
command
=
f
'DROP TABLE {tablename}'
start_time
=
time
.
time
()
start_time
=
time
.
time
()
with
engine
.
connect
()
as
conn
:
with
engine
.
connect
()
as
conn
:
...
@@ -33,7 +36,7 @@ def delete_table(tablename: str, engine) -> bool:
...
@@ -33,7 +36,7 @@ def delete_table(tablename: str, engine) -> bool:
_
=
conn
.
execute
(
command
)
_
=
conn
.
execute
(
command
)
delete
=
True
delete
=
True
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
el
if
base_Datos
==
"mysql"
:
el
se
:
command
=
f
'DROP TABLE IF EXISTS {tablename}'
command
=
f
'DROP TABLE IF EXISTS {tablename}'
start_time
=
time
.
time
()
start_time
=
time
.
time
()
with
engine
.
connect
()
as
conn
:
with
engine
.
connect
()
as
conn
:
...
@@ -49,11 +52,11 @@ def delete_table(tablename: str, engine) -> bool:
...
@@ -49,11 +52,11 @@ def delete_table(tablename: str, engine) -> bool:
return
delete
return
delete
def
delete_procedure
(
procedure
:
str
,
engine
)
->
bool
:
def
delete_procedure
(
procedure
:
str
,
connection
)
->
bool
:
base_Datos
=
engine
.
dialect
.
nam
e
engine
=
connection
.
engin
e
delete
=
False
delete
=
False
try
:
try
:
if
base_Datos
==
"oracle"
:
if
connection
.
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
:
command
=
f
"DROP PROCEDURE {procedure}"
command
=
f
"DROP PROCEDURE {procedure}"
start_time
=
time
.
time
()
start_time
=
time
.
time
()
with
engine
.
connect
()
as
conn
:
with
engine
.
connect
()
as
conn
:
...
@@ -68,7 +71,7 @@ def delete_procedure(procedure: str, engine) -> bool:
...
@@ -68,7 +71,7 @@ def delete_procedure(procedure: str, engine) -> bool:
_
=
conn
.
execute
(
command
)
_
=
conn
.
execute
(
command
)
delete
=
True
delete
=
True
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
logger
.
debug
(
f
"Duración de borrado: {time.time() - start_time}"
)
el
if
base_Datos
==
"mysql"
:
el
se
:
command
=
f
'DROP PROCEDURE IF EXISTS {procedure}'
command
=
f
'DROP PROCEDURE IF EXISTS {procedure}'
start_time
=
time
.
time
()
start_time
=
time
.
time
()
with
engine
.
connect
()
as
conn
:
with
engine
.
connect
()
as
conn
:
...
...
dags/components/Databases/Mysql.py
View file @
e4197850
...
@@ -135,4 +135,4 @@ class Mysql:
...
@@ -135,4 +135,4 @@ class Mysql:
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo counts de tabla. {e}"
)
logger
.
error
(
f
"Error obteniendo counts de tabla. {e}"
)
finally
:
finally
:
return
exists
return
exists
\ No newline at end of file
dags/components/Databases/Oracle.py
View file @
e4197850
...
@@ -90,7 +90,6 @@ class Oracle:
...
@@ -90,7 +90,6 @@ class Oracle:
def
generate_sql_procedure
(
self
,
command
:
str
,
reserved_word
:
str
=
"execute"
)
->
str
:
def
generate_sql_procedure
(
self
,
command
:
str
,
reserved_word
:
str
=
"execute"
)
->
str
:
response
=
""
response
=
""
try
:
try
:
logger
.
info
(
"COMANDO"
,
command
)
command
=
command
.
replace
(
reserved_word
,
""
)
.
replace
(
";"
,
""
)
command
=
command
.
replace
(
reserved_word
,
""
)
.
replace
(
";"
,
""
)
response
=
f
"BEGIN {command}; END;"
response
=
f
"BEGIN {command}; END;"
logger
.
debug
(
"COMANDO ORACLE:"
,
response
)
logger
.
debug
(
"COMANDO ORACLE:"
,
response
)
...
...
dags/components/Extractor.py
View file @
e4197850
...
@@ -145,8 +145,6 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
...
@@ -145,8 +145,6 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
create
=
intern_conn
.
create_table
(
model
)
create
=
intern_conn
.
create_table
(
model
)
if
create
:
if
create
:
logger
.
info
(
f
"Creado correctamente la tabla {tablename}. Creado?: {create}"
)
logger
.
info
(
f
"Creado correctamente la tabla {tablename}. Creado?: {create}"
)
#extract_tables.append(tablename)
#task.xcom_push(key="TABLES_CREATED", value=tablename)
else
:
else
:
raise
AssertionError
(
f
"Error creando tabla {tablename}"
)
raise
AssertionError
(
f
"Error creando tabla {tablename}"
)
if
is_procedure
:
if
is_procedure
:
...
@@ -225,7 +223,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
...
@@ -225,7 +223,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
task
.
xcom_push
(
key
=
"TABLES_CREATED"
,
value
=
tablename
)
task
.
xcom_push
(
key
=
"TABLES_CREATED"
,
value
=
tablename
)
task
.
xcom_push
(
key
=
"END_PROCESS_DATETIME_"
+
str
(
task
.
map_index
),
value
=
end_process_datetime
)
task
.
xcom_push
(
key
=
"END_PROCESS_DATETIME_"
+
str
(
task
.
map_index
),
value
=
end_process_datetime
)
except
Exception
as
e
:
except
Exception
as
e
:
delete
=
delete_table
(
tablename
,
intern_conn
.
engine
)
delete
=
delete_table
(
tablename
,
intern_conn
)
if
delete
:
if
delete
:
logger
.
info
(
f
"Se borró correctamente la tabla {tablename}"
)
logger
.
info
(
f
"Se borró correctamente la tabla {tablename}"
)
raise
AssertionError
(
f
"Error creando la tabla y migrando datos. {type(e)}. {e}"
)
raise
AssertionError
(
f
"Error creando la tabla y migrando datos. {type(e)}. {e}"
)
...
@@ -240,7 +238,6 @@ def get_select_from_xcom(db_intern_conn, **kwargs):
...
@@ -240,7 +238,6 @@ def get_select_from_xcom(db_intern_conn, **kwargs):
conf
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"CONTROL-CONFIG"
)
conf
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"CONTROL-CONFIG"
)
definitions
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"EXTRACTION-DEFINITION-JSON"
)
definitions
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"EXTRACTION-DEFINITION-JSON"
)
temp_conf
=
conf
[
-
1
][
"objects_created"
]
if
"objects_created"
in
conf
[
-
1
]
.
keys
()
else
[]
temp_conf
=
conf
[
-
1
][
"objects_created"
]
if
"objects_created"
in
conf
[
-
1
]
.
keys
()
else
[]
engine
=
db_intern_conn
.
engine
logger
.
info
(
f
"OBJETOS CREADOS: {temp_conf}"
)
logger
.
info
(
f
"OBJETOS CREADOS: {temp_conf}"
)
identificadores
=
[
item
[
"identifier"
]
for
item
in
definitions
]
identificadores
=
[
item
[
"identifier"
]
for
item
in
definitions
]
if
temp_conf
:
if
temp_conf
:
...
@@ -258,8 +255,8 @@ def get_select_from_xcom(db_intern_conn, **kwargs):
...
@@ -258,8 +255,8 @@ def get_select_from_xcom(db_intern_conn, **kwargs):
tablas_temp
=
list
(
tablas_temp
)
tablas_temp
=
list
(
tablas_temp
)
if
tablas_temp
:
if
tablas_temp
:
for
i
in
tablas_temp
:
for
i
in
tablas_temp
:
delete
=
delete_table
(
i
,
engine
)
delete
=
delete_table
(
i
,
db_intern_conn
)
delete2
=
delete_procedure
(
i
,
engine
)
delete2
=
delete_procedure
(
i
,
db_intern_conn
)
if
delete
:
if
delete
:
logger
.
info
(
f
"Borrado correctamente la tabla temporal {i}"
)
logger
.
info
(
f
"Borrado correctamente la tabla temporal {i}"
)
if
delete2
:
if
delete2
:
...
...
dags/components/Transformation.py
View file @
e4197850
...
@@ -147,12 +147,10 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
...
@@ -147,12 +147,10 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
index
=
procedure
.
find
(
label_tablename
+
":"
)
index
=
procedure
.
find
(
label_tablename
+
":"
)
label_lenght
=
len
(
label_tablename
+
":"
)
label_lenght
=
len
(
label_tablename
+
":"
)
tablename
=
procedure
[
index
+
label_lenght
:]
.
strip
()
.
split
(
"
\n
"
)[
0
]
tablename
=
procedure
[
index
+
label_lenght
:]
.
strip
()
.
split
(
"
\n
"
)[
0
]
delete_procedure
(
tablename
,
engine
)
delete_procedure
(
tablename
,
db_intern_conn
)
for
element
in
definitions
:
for
element
in
definitions
:
if
element
[
"identifier"
]
==
tablename
and
"transformation_store_procedure"
in
element
.
keys
()
and
element
[
"transformation_store_procedure"
]
==
True
:
if
element
[
"identifier"
]
==
tablename
and
"transformation_store_procedure"
in
element
.
keys
()
and
element
[
"transformation_store_procedure"
]
==
True
:
logger
.
info
(
f
"Ejecutando creacion de procedure: {procedure}"
)
logger
.
info
(
f
"Ejecutando creacion de procedure: {procedure}"
)
# result = db_intern_conn.check_procedure(tablename, connection)
# if not result:
try
:
try
:
logger
.
info
(
"1"
)
logger
.
info
(
"1"
)
_
=
connection
.
execute
(
procedure
)
_
=
connection
.
execute
(
procedure
)
...
...
dags/components/Utils.py
View file @
e4197850
...
@@ -166,13 +166,6 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str,
...
@@ -166,13 +166,6 @@ def update_sql_commands(dataset: List[Tuple[str, str]], label_tablename: str,
for
item
in
data
:
for
item
in
data
:
if
item
.
lower
()
.
strip
()
==
"end"
:
if
item
.
lower
()
.
strip
()
==
"end"
:
final_data
[
-
1
]
=
final_data
[
-
1
]
+
"; end;"
final_data
[
-
1
]
=
final_data
[
-
1
]
+
"; end;"
# parts = item.split(CommentsScriptEnum.DASHES.value)
# parts = [part for part in parts if len(part.strip()) > 0]
# print(parts)
# if len(parts) > 1:
# for part in parts:
# if not part.strip().lower().startswith(label_tablename.lower()):
# continue
final_item
=
item
final_item
=
item
if
item
.
lower
()
.
strip
()
.
find
(
label_tablename
.
lower
()
.
strip
()
+
":"
)
!=
-
1
:
if
item
.
lower
()
.
strip
()
.
find
(
label_tablename
.
lower
()
.
strip
()
+
":"
)
!=
-
1
:
...
@@ -222,14 +215,6 @@ def select_multiple(command: str) -> Dict[str, Any]:
...
@@ -222,14 +215,6 @@ def select_multiple(command: str) -> Dict[str, Any]:
if
command
.
lower
()
.
replace
(
" "
,
""
)
.
find
(
no_procedure_init
)
!=
-
1
:
if
command
.
lower
()
.
replace
(
" "
,
""
)
.
find
(
no_procedure_init
)
!=
-
1
:
response
[
"is_multiple"
]
=
True
response
[
"is_multiple"
]
=
True
tablename
=
command
[:
command
.
index
(
"|"
)]
.
strip
()
tablename
=
command
[:
command
.
index
(
"|"
)]
.
strip
()
# init_index = command.lower().find("from")
# if init_index == -1:
# raise AssertionError("Query malformed")
# else:
# from_command = command[init_index + 4:]
# tablename_base = from_command.strip().split(" ")
# if len(tablename_base) > 0 and tablename == "":
# tablename = tablename_base[0]
response
[
"tablename"
]
=
tablename
response
[
"tablename"
]
=
tablename
except
Exception
as
e
:
except
Exception
as
e
:
raise
AssertionError
(
f
"Error validando si es múltiple select y nombre de tabla. {e}"
)
raise
AssertionError
(
f
"Error validando si es múltiple select y nombre de tabla. {e}"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment