Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
c81e2e09
Commit
c81e2e09
authored
Oct 13, 2023
by
Erly Villaroel
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Cambios Verificacion de tablas antes de la creacion
parent
e8e7bfd7
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
29 additions
and
14 deletions
+29
-14
Control.py
dags/components/Control.py
+0
-1
Extractor.py
dags/components/Extractor.py
+18
-9
Transformation.py
dags/components/Transformation.py
+9
-2
dag_transformacion_bcom.py
dags/dag_transformacion_bcom.py
+1
-1
procedure_prueba.json
dags/procedure_prueba.json
+1
-1
No files found.
dags/components/Control.py
View file @
c81e2e09
...
...
@@ -13,7 +13,6 @@ def get_tasks_from_control(conf: List[Dict[str, Any]], type_task: str) -> Dict[s
response
=
{
'status'
:
ProcessStatusEnum
.
SUCCESS
.
value
,
'tasks'
:
[],
'reset'
:
False
}
try
:
conf
=
conf
[
-
1
]
logger
.
info
(
f
"Último proceso ejecutado: {conf}"
)
status
=
conf
[
"status"
]
if
"reset_by_user"
in
conf
.
keys
():
response
[
"reset"
]
=
True
...
...
dags/components/Extractor.py
View file @
c81e2e09
...
...
@@ -9,7 +9,7 @@ from enums.DatabaseTypeEnum import DatabaseTypeEnum
from
components.Utils
import
select_multiple
,
generateModel
from
components.DatabaseOperation.DatabaseExtraction
import
get_iterator
,
get_steps
from
components.DatabaseOperation.DatabaseLoad
import
save_from_dataframe
from
components.DatabaseOperation.DatabaseTransformation
import
delete_table
from
components.DatabaseOperation.DatabaseTransformation
import
delete_table
,
delete_procedure
from
components.Control
import
get_tasks_from_control
,
update_new_process
from
components.S3Route
import
load_control_to_s3
from
components.Xcom
import
delete_all_xcom_tasks
,
delete_task_instances
...
...
@@ -69,7 +69,6 @@ def validate_extractor(control_params: Dict[str, Any], timezone: str, provider:
raise
AirflowSkipException
(
f
"Ocurrieron errores en la etapa de extracción"
)
elif
status
==
ProcessStatusEnum
.
SUCCESS
.
value
:
ti
.
xcom_push
(
key
=
"CONTROL-CONFIG"
,
value
=
conf
)
logger
.
info
(
f
"tablas creadas : {lista}"
)
def
on_failure_extractor
(
context
)
->
None
:
...
...
@@ -139,15 +138,9 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, timez
if
isinstance
(
model
,
type
(
None
)):
raise
AssertionError
(
f
"Definición del extracción para {tablename} en el json-descriptor no encontraddo"
)
try
:
# connection = intern_conn.engine.connect()
# result = bool(intern_conn.check_table(tablename, connection))
# logger.info(f"resultado {result}")
# logger.info(f"resultado {type(result)}")
connection
=
intern_conn
.
engine
.
connect
()
result
=
bool
(
intern_conn
.
check_table
(
tablename
,
connection
))
resultado
=
intern_conn
.
verify_table
(
tablename
,
connection
)
logger
.
info
(
f
"resultado {result}"
)
logger
.
info
(
f
"resultado2 {resultado}"
)
if
not
result
or
not
resultado
:
create
=
intern_conn
.
create_table
(
model
)
if
create
:
...
...
@@ -242,18 +235,34 @@ def get_select_from_xcom(db_intern_conn, **kwargs):
task
=
kwargs
[
'ti'
]
final_selects
=
[]
lista
=
[]
tablas_temp
=
set
()
conf
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"CONTROL-CONFIG"
)
definitions
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"EXTRACTION-DEFINITION-JSON"
)
temp_conf
=
conf
[
-
1
][
"objects_created"
]
engine
=
db_intern_conn
.
engine
logger
.
info
(
f
"OBJETOS CREADOS: {temp_conf}"
)
identificadores
=
[
item
[
"identifier"
]
for
item
in
definitions
]
if
temp_conf
:
tablas_temp
=
[
i
for
i
in
temp_conf
if
i
.
startswith
(
"Temp"
)]
for
i
in
temp_conf
:
for
table
in
definitions
:
if
i
==
table
[
"identifier"
]:
if
"fields"
not
in
table
.
keys
():
tablas_temp
.
add
(
i
)
break
elif
i
not
in
identificadores
:
tablas_temp
.
add
(
i
)
break
logger
.
info
(
f
"tablas temporales: {tablas_temp}"
)
tablas_temp
=
list
(
tablas_temp
)
if
tablas_temp
:
for
i
in
tablas_temp
:
delete
=
delete_table
(
i
,
engine
)
delete2
=
delete_procedure
(
i
,
engine
)
if
delete
:
logger
.
info
(
f
"Borrado correctamente la tabla temporal {i}"
)
if
delete2
:
logger
.
info
(
f
"Borrado correctamente procedure {i}"
)
tasks
=
get_tasks_from_control
(
conf
,
"extractor"
)
success_tasks
=
tasks
[
"tasks"
]
success_tasks
=
[
item
[
1
]
for
item
in
success_tasks
]
...
...
dags/components/Transformation.py
View file @
c81e2e09
...
...
@@ -5,6 +5,7 @@ from airflow.operators.python import PythonOperator
from
airflow.models
import
Variable
from
airflow.decorators
import
task
from
airflow.exceptions
import
AirflowSkipException
from
components.DatabaseOperation.DatabaseTransformation
import
delete_table
,
delete_procedure
from
components.S3Route
import
get_files_from_prefix
,
get_file_from_prefix
from
components.Xcom
import
save_commands_to_xcom
from
components.Utils
import
update_sql_commands_2
...
...
@@ -134,6 +135,7 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
for
element
in
definitions
:
if
"temp_table"
in
element
.
keys
()
and
element
[
"temp_table"
]
==
True
:
lista
.
append
(
element
[
"identifier"
])
with
engine
.
connect
()
as
connection
:
for
procedure
in
procedures
:
procedure
=
procedure
[
1
]
...
...
@@ -141,12 +143,17 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
index
=
procedure
.
find
(
label_tablename
+
":"
)
label_lenght
=
len
(
label_tablename
+
":"
)
tablename
=
procedure
[
index
+
label_lenght
:]
.
strip
()
.
split
(
"
\n
"
)[
0
]
delete_procedure
(
tablename
,
engine
)
for
element
in
definitions
:
if
element
[
"identifier"
]
==
tablename
and
"transformation_store_procedure"
in
element
.
keys
()
and
element
[
"transformation_store_procedure"
]
==
True
:
logger
.
info
(
f
"Ejecutando creacion de procedure: {procedure}"
)
result
=
db_intern_conn
.
check_procedure
(
tablename
,
connection
)
if
not
result
:
# result = db_intern_conn.check_procedure(tablename, connection)
# if not result:
try
:
_
=
connection
.
execute
(
procedure
)
except
Exception
as
e
:
logger
.
error
(
f
" Error: {e}"
)
raise
AirflowSkipException
proc_created
.
append
(
tablename
)
lista
.
append
(
tablename
)
else
:
...
...
dags/dag_transformacion_bcom.py
View file @
c81e2e09
...
...
@@ -22,7 +22,7 @@ import logging
logger
=
logging
.
getLogger
()
DAG_NAME
=
"BCOM_DAG_EXTRACT_AND_TRANSFORM"
DAG_NAME
=
"BCOM_DAG_EXTRACT_AND_TRANSFORM
2
"
# Change this path if is deployed in prod or dev
MAIN_PATH
=
"/opt/airflow/dags/"
...
...
dags/procedure_prueba.json
View file @
c81e2e09
...
...
@@ -12,7 +12,7 @@
"temp_table"
:
true
},
{
"identifier"
:
"tabla1"
,
"identifier"
:
"tabla1
1
"
,
"fields"
:
[
{
"name"
:
"id"
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment