Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
91b0963a
Commit
91b0963a
authored
Oct 19, 2023
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Process accept xlsx files as output
parent
e0d3c20c
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
20 additions
and
6 deletions
+20
-6
Extractor.py
dags/components/Extractor.py
+1
-1
Generation.py
dags/components/Generation.py
+12
-4
S3Route.py
dags/components/S3Route.py
+7
-1
No files found.
dags/components/Extractor.py
View file @
91b0963a
...
@@ -239,7 +239,7 @@ def get_select_from_xcom(db_intern_conn, **kwargs):
...
@@ -239,7 +239,7 @@ def get_select_from_xcom(db_intern_conn, **kwargs):
tablas_temp
=
set
()
tablas_temp
=
set
()
conf
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"CONTROL-CONFIG"
)
conf
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"CONTROL-CONFIG"
)
definitions
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"EXTRACTION-DEFINITION-JSON"
)
definitions
=
task
.
xcom_pull
(
task_ids
=
"SCRIPTS-EXTRACTOR"
,
key
=
"EXTRACTION-DEFINITION-JSON"
)
temp_conf
=
conf
[
-
1
][
"objects_created"
]
temp_conf
=
conf
[
-
1
][
"objects_created"
]
if
"objects_created"
in
conf
[
-
1
]
.
keys
()
else
[]
engine
=
db_intern_conn
.
engine
engine
=
db_intern_conn
.
engine
logger
.
info
(
f
"OBJETOS CREADOS: {temp_conf}"
)
logger
.
info
(
f
"OBJETOS CREADOS: {temp_conf}"
)
identificadores
=
[
item
[
"identifier"
]
for
item
in
definitions
]
identificadores
=
[
item
[
"identifier"
]
for
item
in
definitions
]
...
...
dags/components/Generation.py
View file @
91b0963a
...
@@ -11,6 +11,7 @@ from airflow.exceptions import AirflowSkipException
...
@@ -11,6 +11,7 @@ from airflow.exceptions import AirflowSkipException
from
enums.ProcessStatusEnum
import
ProcessStatusEnum
from
enums.ProcessStatusEnum
import
ProcessStatusEnum
from
enums.DataTypeEnum
import
DataTypeEnum
from
enums.DataTypeEnum
import
DataTypeEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
enums.FileTypeEnum
import
FileTypeEnum
from
components.S3Route
import
save_df_to_s3
,
load_control_to_s3
from
components.S3Route
import
save_df_to_s3
,
load_control_to_s3
from
components.Utils
import
select_multiple
,
create_temp_file
,
delete_temp_dir
from
components.Utils
import
select_multiple
,
create_temp_file
,
delete_temp_dir
from
components.Control
import
get_tasks_from_control
,
update_new_process
from
components.Control
import
get_tasks_from_control
,
update_new_process
...
@@ -122,6 +123,8 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
...
@@ -122,6 +123,8 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
delimiter
=
params
[
"delimiter"
]
delimiter
=
params
[
"delimiter"
]
tmp_path
=
params
[
"tmp_path"
]
tmp_path
=
params
[
"tmp_path"
]
tmp_file
=
create_temp_file
(
tmp_path
,
filename_mask
,
file_type
,
tablename
,
timezone
,
pattern
)
tmp_file
=
create_temp_file
(
tmp_path
,
filename_mask
,
file_type
,
tablename
,
timezone
,
pattern
)
if
file_type
==
FileTypeEnum
.
EXCEL
.
value
or
FileTypeEnum
.
OLD_EXCEL
.
value
:
os
.
remove
(
tmp_file
)
logger
.
info
(
f
"Ruta creada: {tmp_file}"
)
logger
.
info
(
f
"Ruta creada: {tmp_file}"
)
logger
.
debug
(
f
"TABLA: {tablename}"
)
logger
.
debug
(
f
"TABLA: {tablename}"
)
steps
=
get_steps
(
tablename
,
chunksize
,
engine
,
True
)
steps
=
get_steps
(
tablename
,
chunksize
,
engine
,
True
)
...
@@ -140,12 +143,17 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
...
@@ -140,12 +143,17 @@ def generate_and_deploy(command: str, intern_conn, params: Dict[str, Any], timez
if
campo
in
campos
.
keys
():
if
campo
in
campos
.
keys
():
if
campos
[
campo
]
==
DataTypeEnum
.
DATE
.
name
:
if
campos
[
campo
]
==
DataTypeEnum
.
DATE
.
name
:
dataframe
[
campo
]
=
pd
.
to_datetime
(
dataframe
[
campo
])
.
dt
.
date
dataframe
[
campo
]
=
pd
.
to_datetime
(
dataframe
[
campo
])
.
dt
.
date
# elif campos[campo] == DataTypeEnum.DATETIME.name: # datetime:
# dataframe[campo] = pd.to_datetime(dataframe[campo], format='%Y-%m-%d %H:%M:%S')
dataframe
=
dataframe
.
drop
(
"INTERN_ID_BCOM"
,
axis
=
1
,
errors
=
'ignore'
)
dataframe
=
dataframe
.
drop
(
"INTERN_ID_BCOM"
,
axis
=
1
,
errors
=
'ignore'
)
logger
.
debug
(
dataframe
)
logger
.
debug
(
dataframe
)
dataframe
.
to_csv
(
tmp_file
,
sep
=
delimiter
,
index
=
False
,
mode
=
'a'
,
header
=
header
)
if
file_type
==
FileTypeEnum
.
CSV
.
value
or
file_type
==
FileTypeEnum
.
TEXT
.
value
:
dataframe
.
to_csv
(
tmp_file
,
sep
=
delimiter
,
index
=
False
,
mode
=
'a'
,
header
=
header
)
elif
file_type
==
FileTypeEnum
.
EXCEL
.
value
or
file_type
==
FileTypeEnum
.
OLD_EXCEL
.
value
:
if
header
:
with
pd
.
ExcelWriter
(
tmp_file
,
engine
=
"openpyxl"
,
mode
=
'w'
)
as
writer
:
dataframe
.
to_excel
(
writer
,
index
=
False
,
header
=
True
)
else
:
with
pd
.
ExcelWriter
(
tmp_file
,
engine
=
"openpyxl"
,
mode
=
'a'
,
if_sheet_exists
=
'overlay'
)
as
writer
:
dataframe
.
to_excel
(
writer
,
index
=
False
,
header
=
False
)
except
StopIteration
:
except
StopIteration
:
break
break
list_outputs
=
params
[
"s3_params"
]
list_outputs
=
params
[
"s3_params"
]
...
...
dags/components/S3Route.py
View file @
91b0963a
...
@@ -115,7 +115,13 @@ def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, p
...
@@ -115,7 +115,13 @@ def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, p
else
:
else
:
hook
.
load_bytes
(
buffer
.
getvalue
(),
key
,
bucket
,
True
)
hook
.
load_bytes
(
buffer
.
getvalue
(),
key
,
bucket
,
True
)
else
:
else
:
pass
logger
.
info
(
f
"Llegue aca, para guardar"
)
logger
.
info
(
f
"DATA: {data}. BUCKET: {bucket}. KEY: {key}"
)
# convert_df = pd.read_csv()
if
gcp_cloud
:
hook
.
upload
(
bucket
,
key
,
data
)
else
:
hook
.
load_file
(
data
,
key
,
bucket
)
elif
file_type
==
FileTypeEnum
.
CSV
or
file_type
==
FileTypeEnum
.
TEXT
:
elif
file_type
==
FileTypeEnum
.
CSV
or
file_type
==
FileTypeEnum
.
TEXT
:
if
in_memory
:
if
in_memory
:
csv_buffer
=
BytesIO
()
csv_buffer
=
BytesIO
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment