Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
dc07bb88
Commit
dc07bb88
authored
Oct 11, 2023
by
Erly Villaroel
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
DAG Reset process
parent
b9272727
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
80 additions
and
22 deletions
+80
-22
Control.py
dags/components/Control.py
+1
-1
Generation.py
dags/components/Generation.py
+1
-0
dag_conf.yml
dags/dag_conf.yml
+4
-4
dag_reset_process.py
dags/dag_reset_process.py
+1
-1
dag_transformacion_bcom.py
dags/dag_transformacion_bcom.py
+1
-2
procedure_prueba.json
dags/procedure_prueba.json
+72
-14
No files found.
dags/components/Control.py
View file @
dc07bb88
...
@@ -48,7 +48,7 @@ def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str,
...
@@ -48,7 +48,7 @@ def update_new_process(conf: List[Dict[str, Any]], status: str, tasks: Dict[str,
tasks
.
update
(
last_tasks
)
tasks
.
update
(
last_tasks
)
conf
.
pop
(
-
1
)
conf
.
pop
(
-
1
)
current_datetime
=
str
(
datetime_by_tzone
(
timezone
,
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
))
current_datetime
=
str
(
datetime_by_tzone
(
timezone
,
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
))
new_process
=
{
"date"
:
current_datetime
,
"status"
:
status
,
"tasks"
:
tasks
,
"
created_tables
"
:
created_tables
}
new_process
=
{
"date"
:
current_datetime
,
"status"
:
status
,
"tasks"
:
tasks
,
"
objects_created
"
:
created_tables
}
if
current_period
==
processed_period
and
isinstance
(
conf
,
List
):
if
current_period
==
processed_period
and
isinstance
(
conf
,
List
):
conf
.
append
(
new_process
)
conf
.
append
(
new_process
)
else
:
else
:
...
...
dags/components/Generation.py
View file @
dc07bb88
...
@@ -140,6 +140,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
...
@@ -140,6 +140,7 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
dataframe
[
campo
]
=
pd
.
to_datetime
(
dataframe
[
campo
])
.
dt
.
date
dataframe
[
campo
]
=
pd
.
to_datetime
(
dataframe
[
campo
])
.
dt
.
date
# elif campos[campo] == DataTypeEnum.DATETIME.name: # datetime:
# elif campos[campo] == DataTypeEnum.DATETIME.name: # datetime:
# dataframe[campo] = pd.to_datetime(dataframe[campo], format='%Y-%m-%d %H:%M:%S')
# dataframe[campo] = pd.to_datetime(dataframe[campo], format='%Y-%m-%d %H:%M:%S')
dataframe
=
dataframe
.
drop
(
"INTERN_ID_BCOM"
,
axis
=
1
,
errors
=
'ignore'
)
dataframe
=
dataframe
.
drop
(
"INTERN_ID_BCOM"
,
axis
=
1
,
errors
=
'ignore'
)
logger
.
debug
(
dataframe
)
logger
.
debug
(
dataframe
)
dataframe
.
to_csv
(
tmp_file
,
sep
=
delimiter
,
index
=
False
,
mode
=
'a'
,
header
=
header
)
dataframe
.
to_csv
(
tmp_file
,
sep
=
delimiter
,
index
=
False
,
mode
=
'a'
,
header
=
header
)
...
...
dags/dag_conf.yml
View file @
dc07bb88
...
@@ -55,15 +55,15 @@ app:
...
@@ -55,15 +55,15 @@ app:
delimiter
:
'
|'
delimiter
:
'
|'
tmp_path
:
/tmp
tmp_path
:
/tmp
s3_params
:
s3_params
:
ESTUDIANTES_1
:
tabla4
:
bucket
:
pruebairflow
bucket
:
pruebairflow
prefix
:
bcom_results
prefix
:
bcom_results
connection_id
:
prueba_af
connection_id
:
prueba_af
ESTUDIANTES_1
1
:
tabla
1
:
bucket
:
pruebairflow
bucket
:
pruebairflow
prefix
:
bcom_results
prefix
:
bcom_results
connection_id
:
prueba_af
connection_id
:
prueba_af
CATALOGO_PROMOCIONES
:
tabla2
:
bucket
:
pruebairflow
bucket
:
pruebairflow
prefix
:
bcom_results
prefix
:
bcom_results
connection_id
:
prueba_af
connection_id
:
prueba_af
...
@@ -74,7 +74,7 @@ app:
...
@@ -74,7 +74,7 @@ app:
connection_id
:
conn_script
connection_id
:
conn_script
filename
:
report_<datetime>.xlsx
filename
:
report_<datetime>.xlsx
datetime_pattern
:
'
%Y-%m-%d
%H:%M:%S'
datetime_pattern
:
'
%Y-%m-%d
%H:%M:%S'
procedure
:
definitions
:
filepath
:
"
/opt/airflow/dags/procedure_prueba.json"
filepath
:
"
/opt/airflow/dags/procedure_prueba.json"
dags/dag_reset_process.py
View file @
dc07bb88
...
@@ -112,7 +112,7 @@ def set_dag():
...
@@ -112,7 +112,7 @@ def set_dag():
control_s3
=
conf
[
"control"
][
"s3_params"
]
control_s3
=
conf
[
"control"
][
"s3_params"
]
timezone
=
conf
[
"timezone"
]
timezone
=
conf
[
"timezone"
]
# NUEVA VARIABLE
# NUEVA VARIABLE
procedure
=
conf
[
"
procedure
"
]
procedure
=
conf
[
"
definitions
"
]
control_extractor
=
PythonOperator
(
control_extractor
=
PythonOperator
(
task_id
=
"CONTROL-EXTRACTOR"
,
task_id
=
"CONTROL-EXTRACTOR"
,
python_callable
=
extract_last_control
,
python_callable
=
extract_last_control
,
...
...
dags/dag_transformacion_bcom.py
View file @
dc07bb88
...
@@ -138,7 +138,6 @@ def set_dag():
...
@@ -138,7 +138,6 @@ def set_dag():
conf_path
=
MAIN_PATH
+
"dag_conf.yml"
conf_path
=
MAIN_PATH
+
"dag_conf.yml"
with
open
(
conf_path
)
as
f
:
with
open
(
conf_path
)
as
f
:
data
=
yaml
.
load
(
f
,
Loader
=
SafeLoader
)
data
=
yaml
.
load
(
f
,
Loader
=
SafeLoader
)
logger
.
info
(
f
"CONFIGURACIÓN: {data}"
)
conf
=
data
[
"app"
]
conf
=
data
[
"app"
]
with
DAG
(
DAG_NAME
,
default_args
=
DEFAULT_ARGS
,
description
=
"Proceso que extrae y transforma"
,
with
DAG
(
DAG_NAME
,
default_args
=
DEFAULT_ARGS
,
description
=
"Proceso que extrae y transforma"
,
schedule_interval
=
conf
[
"schedule"
],
tags
=
[
"DAG BCOM - SQL TRANSFORMATIONS"
],
catchup
=
False
)
as
dag
:
schedule_interval
=
conf
[
"schedule"
],
tags
=
[
"DAG BCOM - SQL TRANSFORMATIONS"
],
catchup
=
False
)
as
dag
:
...
@@ -152,7 +151,7 @@ def set_dag():
...
@@ -152,7 +151,7 @@ def set_dag():
control_s3
=
conf
[
"control"
][
"s3_params"
]
control_s3
=
conf
[
"control"
][
"s3_params"
]
timezone
=
conf
[
"timezone"
]
timezone
=
conf
[
"timezone"
]
# NUEVA VARIABLE
# NUEVA VARIABLE
procedure
=
conf
[
"
procedure
"
]
procedure
=
conf
[
"
definitions
"
]
# Scripts extraction
# Scripts extraction
extract_mask
=
conf
[
"source_mask"
]
extract_mask
=
conf
[
"source_mask"
]
transform_mask
=
conf
[
"transformation_mask"
]
transform_mask
=
conf
[
"transformation_mask"
]
...
...
dags/procedure_prueba.json
View file @
dc07bb88
[
[
{
{
"identifier"
:
"
obtenerEstudiantes
"
,
"identifier"
:
"
ModificarTabla4
"
,
"transformation_store_procedure"
:
true
"transformation_store_procedure"
:
true
},
},
{
{
"identifier"
:
"TempEstudiantes"
,
"identifier"
:
"UnionYInsert"
,
"transformation_store_procedure"
:
true
},
{
"identifier"
:
"TempTabla"
,
"temp_table"
:
true
"temp_table"
:
true
},
},
{
{
"identifier"
:
"
ESTUDIANTES_1
1"
,
"identifier"
:
"
tabla
1"
,
"fields"
:
[
"fields"
:
[
{
{
"name"
:
"
ID
"
,
"name"
:
"
id
"
,
"datatype"
:
"NUMBER"
,
"datatype"
:
"NUMBER"
,
"decimal_precision"
:
0
"decimal_precision"
:
0
},
},
{
{
"name"
:
"
N
ombre"
,
"name"
:
"
n
ombre"
,
"datatype"
:
"TEXT"
,
"datatype"
:
"TEXT"
,
"maxLength"
:
50
"maxLength"
:
50
},
},
{
{
"name"
:
"Apellido"
,
"name"
:
"fecha_nacimiento"
,
"datatype"
:
"DATE"
}
],
"indexes"
:
[
{
"name"
:
"indice1"
,
"index_fields"
:
[
"id"
]
}
],
"save_output"
:
false
},
{
"identifier"
:
"tabla2"
,
"fields"
:
[
{
"name"
:
"id"
,
"datatype"
:
"NUMBER"
,
"decimal_precision"
:
0
},
{
"name"
:
"apellido"
,
"datatype"
:
"TEXT"
,
"datatype"
:
"TEXT"
,
"maxLength"
:
50
"maxLength"
:
50
},
},
{
{
"name"
:
"Edad"
,
"name"
:
"fecha_registro"
,
"datatype"
:
"DATETIME"
}
],
"indexes"
:
[
{
"name"
:
"indice1"
,
"index_fields"
:
[
"id"
]
}
],
"save_output"
:
false
},
{
"identifier"
:
"tabla3"
,
"fields"
:
[
{
"name"
:
"id"
,
"datatype"
:
"NUMBER"
,
"datatype"
:
"NUMBER"
,
"decimal_precision"
:
0
"decimal_precision"
:
0
},
},
{
{
"name"
:
"CorreoElectronico"
,
"name"
:
"edad"
,
"datatype"
:
"TEXT"
,
"datatype"
:
"NUMBER"
,
"maxLength"
:
100
"decimal_precision"
:
0
},
{
"name"
:
"fecha_actualizacion"
,
"datatype"
:
"DATETIME"
}
}
],
],
"indexes"
:
[
"indexes"
:
[
{
{
"name"
:
"indice1"
,
"name"
:
"indice1"
,
"index_fields"
:
[
"index_fields"
:
[
"
ID
"
"
id
"
]
]
}
}
],
],
"save_output"
:
tru
e
"save_output"
:
fals
e
},
},
{
{
"identifier"
:
"
ESTUDIANTES_1
"
,
"identifier"
:
"
tabla4
"
,
"fields"
:
[
"fields"
:
[
{
{
"name"
:
"id"
,
"name"
:
"id"
,
"datatype"
:
"NUMBER"
,
"datatype"
:
"NUMBER"
,
"decimal_precision"
:
0
"decimal_precision"
:
0
},
},
{
"name"
:
"nombre"
,
"datatype"
:
"TEXT"
,
"maxLength"
:
50
},
{
{
"name"
:
"fecha"
,
"name"
:
"fecha"
,
"datatype"
:
"DATE"
"datatype"
:
"DATE"
},
},
{
{
"name"
:
"fecha_tiempo"
,
"name"
:
"fecha_2"
,
"datatype"
:
"DATETIME"
},
{
"name"
:
"fecha_3"
,
"datatype"
:
"DATETIME"
"datatype"
:
"DATETIME"
}
}
],
],
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment