Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
e5ddb0fc
Commit
e5ddb0fc
authored
Aug 03, 2023
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Update 02-08-23. Fix inform-report bug. Change configmap template
parent
eccddc57
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
12 additions
and
20 deletions
+12
-20
dag_inform_process.py
dags/dag_inform_process.py
+12
-18
airflow-envvars-configmap.yaml
deploy-k8/airflow-envvars-configmap.yaml
+0
-2
No files found.
dags/dag_inform_process.py
View file @
e5ddb0fc
...
...
@@ -20,7 +20,7 @@ logger = logging.getLogger()
DAG_NAME
=
"INFORM_PROCESS"
# Change this path if is deployed in prod or dev
MAIN_PATH
=
"/
roo
t/airflow/dags/"
MAIN_PATH
=
"/
op
t/airflow/dags/"
DEFAULT_ARGS
=
{
'owner'
:
'BCOM'
,
...
...
@@ -33,10 +33,8 @@ DEFAULT_ARGS = {
}
def
upload_report
(
report_pa
rams
:
Dict
[
str
,
Any
],
provider
:
str
,
timezone
:
str
,
**
kwargs
)
->
None
:
def
upload_report
(
report_pa
th
:
str
,
report_params
:
Dict
[
str
,
Any
],
provider
:
str
,
timezone
:
str
)
->
None
:
try
:
task
=
kwargs
[
"ti"
]
report_path
=
task
.
xcom_pull
(
task_ids
=
"CREATE-REPORT"
,
key
=
"REPORT_PATH"
)
pattern
=
report_params
[
"datetime_pattern"
]
conn
=
report_params
[
"s3_params"
][
"connection_id"
]
bucket
=
report_params
[
"s3_params"
][
"bucket"
]
...
...
@@ -54,7 +52,7 @@ def upload_report(report_params: Dict[str, Any], provider: str, timezone: str, *
logger
.
error
(
f
"Error subiendo reporte a . {e}"
)
def
create_
report
(
tmp_path
:
str
,
**
kwargs
)
->
None
:
def
create_
and_upload_report
(
tmp_path
:
str
,
report_params
:
Dict
[
str
,
Any
],
provider
:
str
,
timezone
:
str
,
**
kwargs
)
->
None
:
try
:
task
=
kwargs
[
"ti"
]
data
=
task
.
xcom_pull
(
task_ids
=
"GET-DATA-REPORT"
,
key
=
"REPORT-DATA"
)
...
...
@@ -109,7 +107,8 @@ def create_report(tmp_path: str, **kwargs) -> None:
worksheet
.
merge_range
(
'C'
+
str
(
index
)
+
':G'
+
str
(
index
),
f
"ARCHIVO GENERADO DESDE LA TABLA: {data[key]['DESCRIPTION']}"
,
row_format
)
worksheet
.
merge_range
(
'H'
+
str
(
index
)
+
':I'
+
str
(
index
),
f
"ESTADO: {data[key]['STATUS']}"
,
row_format
)
worksheet
.
merge_range
(
'J'
+
str
(
index
)
+
':N'
+
str
(
index
),
data
[
key
][
'MESSAGE'
],
row_format
)
task
.
xcom_push
(
key
=
"REPORT_PATH"
,
value
=
excel_tmp_path
)
# Upload report
upload_report
(
excel_tmp_path
,
report_params
,
provider
,
timezone
)
except
Exception
as
e
:
logger
.
error
(
f
"Error creando reporte. {e}"
)
...
...
@@ -193,27 +192,22 @@ def set_dag():
trigger_rule
=
"all_success"
)
create
=
PythonOperator
(
get_data
=
PythonOperator
(
task_id
=
"GET-DATA-REPORT"
,
python_callable
=
get_data_report
,
op_kwargs
=
{},
trigger_rule
=
"all_success"
)
tmp_dir
=
conf
[
"outputs"
][
"tmp_path"
]
report
=
PythonOperator
(
task_id
=
"CREATE-REPORT"
,
python_callable
=
create_report
,
op_kwargs
=
{
'tmp_path'
:
tmp_dir
},
trigger_rule
=
"all_success"
)
report_params
=
conf
[
"report"
]
upload
=
PythonOperator
(
task_id
=
"UPLOAD-REPORT"
,
python_callable
=
upload_report
,
op_kwargs
=
{
'report_params'
:
report_params
,
'provider'
:
conf
[
"cloud_provider"
],
'timezone'
:
timezone
},
create_and_upload
=
PythonOperator
(
task_id
=
"CREATE-AND-UPLOAD-REPORT"
,
python_callable
=
create_and_upload_report
,
op_kwargs
=
{
'tmp_path'
:
tmp_dir
,
'report_params'
:
report_params
,
'provider'
:
conf
[
"cloud_provider"
],
'timezone'
:
timezone
},
trigger_rule
=
"all_success"
)
control_extractor
>>
create
>>
report
>>
upload
control_extractor
>>
get_data
>>
create_and_
upload
return
dag
...
...
deploy-k8/airflow-envvars-configmap.yaml
View file @
e5ddb0fc
...
...
@@ -79,7 +79,6 @@ data:
AIRFLOW__LOGGING__LOGGING_LEVEL
:
INFO
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE
:
America/Lima
AIRFLOW__CORE__DEFAULT_TIMEZONE
:
America/Lima
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS
:
'
{"_request_timeout":
[60,60]}'
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
:
cristianfernando/airflow_custom
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
:
"
0.0.6"
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM
:
airflow-logs-pvc
...
...
@@ -88,7 +87,6 @@ data:
AIRFLOW__CORE__EXECUTOR
:
LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
:
postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION
:
'
true'
AIRFLOW__KUBERNETES_EXECUTOR__ENABLE_TCP_KEEPALIVE
:
'
false'
AIRFLOW__CORE__LOAD_EXAMPLES
:
'
false'
_AIRFLOW_DB_UPGRADE
:
'
true'
_AIRFLOW_WWW_USER_CREATE
:
'
true'
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment