Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
3170e104
Commit
3170e104
authored
Jan 09, 2024
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Optimization of extraction process from an Oracle DB
parent
8ebc05c1
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
17 additions
and
23 deletions
+17
-23
DatabaseLoad.py
dags/components/DatabaseOperation/DatabaseLoad.py
+6
-17
S3Route.py
dags/components/S3Route.py
+0
-1
Transformation.py
dags/components/Transformation.py
+11
-5
No files found.
dags/components/DatabaseOperation/DatabaseLoad.py
View file @
3170e104
...
@@ -11,25 +11,14 @@ def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
...
@@ -11,25 +11,14 @@ def save_from_dataframe(df: pd.DataFrame, tablename: str, connection) -> bool:
save
=
False
save
=
False
try
:
try
:
chunksize
=
2000
chunksize
=
2000
#
db_type = connection.db_type
db_type
=
connection
.
db_type
connection
=
connection
.
engine
connection
=
connection
.
engine
# print(df["CREACION_PRODUCTO"].value_counts())
with
connection
.
connect
()
as
conn
:
with
connection
.
connect
()
as
conn
:
# if db_type == DatabaseTypeEnum.ORACLE.value:
if
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
:
# df.info()
dtypes
=
{
c
:
VARCHAR
(
df
[
c
]
.
str
.
len
()
.
max
())
for
c
in
df
.
columns
[
df
.
dtypes
==
'object'
]
.
tolist
()}
# aux = df.columns[df.dtypes == 'object'].tolist()
df
.
to_sql
(
tablename
,
conn
,
if_exists
=
'append'
,
dtype
=
dtypes
,
index
=
False
,
chunksize
=
chunksize
)
# print(aux)
else
:
# dtyp = {}
df
.
to_sql
(
tablename
,
conn
,
if_exists
=
'append'
,
index
=
False
,
chunksize
=
chunksize
)
# for col in aux:
# print(col)
# print(df[col].dtype)
# df[col] = df[col].astype(str)
# dtyp.update({col: VARCHAR(df[col].str.len().max())})
# # dtyp = {c: VARCHAR(df[c].str.len().max()) for c in aux}
# print(dtyp)
# df.to_sql(tablename, conn, if_exists='append', dtype=dtyp, index=False, chunksize=chunksize)
# else:
df
.
to_sql
(
tablename
,
conn
,
if_exists
=
'append'
,
index
=
False
,
chunksize
=
chunksize
)
save
=
True
save
=
True
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
"Error guardando resultados desde dataframe. {e}"
)
logger
.
error
(
f
"Error guardando resultados desde dataframe. {e}"
)
...
...
dags/components/S3Route.py
View file @
3170e104
...
@@ -115,7 +115,6 @@ def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, p
...
@@ -115,7 +115,6 @@ def save_df_to_s3(data: pd.DataFrame or str, conn: str, bucket: str, key: str, p
else
:
else
:
hook
.
load_bytes
(
buffer
.
getvalue
(),
key
,
bucket
,
True
)
hook
.
load_bytes
(
buffer
.
getvalue
(),
key
,
bucket
,
True
)
else
:
else
:
logger
.
info
(
f
"Llegue aca, para guardar"
)
logger
.
info
(
f
"DATA: {data}. BUCKET: {bucket}. KEY: {key}"
)
logger
.
info
(
f
"DATA: {data}. BUCKET: {bucket}. KEY: {key}"
)
# convert_df = pd.read_csv()
# convert_df = pd.read_csv()
if
gcp_cloud
:
if
gcp_cloud
:
...
...
dags/components/Transformation.py
View file @
3170e104
...
@@ -8,12 +8,12 @@ from airflow.exceptions import AirflowSkipException
...
@@ -8,12 +8,12 @@ from airflow.exceptions import AirflowSkipException
from
components.DatabaseOperation.DatabaseTransformation
import
delete_table
,
delete_procedure
from
components.DatabaseOperation.DatabaseTransformation
import
delete_table
,
delete_procedure
from
components.S3Route
import
get_files_from_prefix
,
get_file_from_prefix
from
components.S3Route
import
get_files_from_prefix
,
get_file_from_prefix
from
components.Xcom
import
save_commands_to_xcom
from
components.Xcom
import
save_commands_to_xcom
from
components.Utils
import
update_sql_commands_2
from
components.Control
import
get_tasks_from_control
,
update_new_process
from
components.Control
import
get_tasks_from_control
,
update_new_process
from
components.S3Route
import
load_control_to_s3
from
components.S3Route
import
load_control_to_s3
from
components.Xcom
import
delete_all_xcom_tasks
,
delete_task_instances
from
components.Xcom
import
delete_all_xcom_tasks
,
delete_task_instances
from
enums.ProcessStatusEnum
import
ProcessStatusEnum
from
enums.ProcessStatusEnum
import
ProcessStatusEnum
from
enums.OperationTypeEnum
import
OperationTypeEnum
from
enums.OperationTypeEnum
import
OperationTypeEnum
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.Timezone
import
datetime_by_tzone
from
components.Timezone
import
datetime_by_tzone
import
logging
import
logging
...
@@ -99,12 +99,16 @@ def transformations(xcom_commands: str, intern_conn, timezone: str, **kwargs):
...
@@ -99,12 +99,16 @@ def transformations(xcom_commands: str, intern_conn, timezone: str, **kwargs):
script_name
=
xcom_commands
[
0
]
script_name
=
xcom_commands
[
0
]
commands
=
xcom_commands
[
1
]
commands
=
xcom_commands
[
1
]
logger
.
info
(
f
"Ejecutando transformaciones del script {script_name}"
)
logger
.
info
(
f
"Ejecutando transformaciones del script {script_name}"
)
not_procedure
=
[
"UPDATE"
,
"SELECT"
,
"CREATE"
,
"ALTER"
,
"DROP"
,
"DELETE"
,
"INSERT"
,
"GRANT"
,
"REVOKE"
,
"TRUNCATE"
,
"COPY"
,
logger
.
info
(
f
"XCOM_COMMANDS: {xcom_commands}"
)
"COMMIT"
,
"ROLLBACK"
,
"USE"
,
"BEGIN"
]
not_procedure
=
[
"UPDATE"
,
"SELECT"
,
"CREATE"
,
"ALTER"
,
"DROP"
,
"DELETE"
,
"INSERT"
,
" GRANT"
,
"REVOKE"
,
"TRUNCATE"
,
"COPY"
,
"COMMIT"
,
"ROLLBACK"
,
"USE"
,
"BEGIN"
]
with
engine
.
connect
()
as
connection
:
with
engine
.
connect
()
as
connection
:
for
command
in
commands
:
for
command
in
commands
:
logger
.
info
(
f
"Commando: {command}"
)
if
any
(
command
.
startswith
(
palabra
)
or
command
.
startswith
(
palabra
.
lower
())
for
palabra
in
not_procedure
):
if
any
(
command
.
startswith
(
palabra
)
or
command
.
startswith
(
palabra
.
lower
())
for
palabra
in
not_procedure
):
logger
.
info
(
f
"Ejecutando comando de transformación: {command}"
)
logger
.
info
(
f
"Ejecutando comando de transformación en {intern_conn.db_type} : {command}"
)
if
intern_conn
.
db_type
==
DatabaseTypeEnum
.
ORACLE
.
value
:
command
=
command
[:
-
1
]
_
=
connection
.
execute
(
command
)
_
=
connection
.
execute
(
command
)
else
:
else
:
logger
.
info
(
f
"Generando llamada al procedure según bd para: {command}"
)
logger
.
info
(
f
"Generando llamada al procedure según bd para: {command}"
)
...
@@ -150,7 +154,9 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
...
@@ -150,7 +154,9 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
# result = db_intern_conn.check_procedure(tablename, connection)
# result = db_intern_conn.check_procedure(tablename, connection)
# if not result:
# if not result:
try
:
try
:
logger
.
info
(
"1"
)
_
=
connection
.
execute
(
procedure
)
_
=
connection
.
execute
(
procedure
)
logger
.
info
(
"2"
)
except
Exception
as
e
:
except
Exception
as
e
:
logger
.
error
(
f
" Error: {e}"
)
logger
.
error
(
f
" Error: {e}"
)
raise
AirflowSkipException
raise
AirflowSkipException
...
@@ -159,7 +165,7 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
...
@@ -159,7 +165,7 @@ def get_trans_from_xcom(provider, store_procedure, procedure_mask, label_tablena
else
:
else
:
logger
.
debug
(
f
"No se encontró el label en {procedure} por ende no se creará"
)
logger
.
debug
(
f
"No se encontró el label en {procedure} por ende no se creará"
)
save_commands_to_xcom
(
procedures
,
kwargs
[
'ti'
],
procedure_mask
,
transform_mask
,
procedure_mask
,
order_delimiter
)
save_commands_to_xcom
(
procedures
,
kwargs
[
'ti'
],
procedure_mask
,
transform_mask
,
procedure_mask
,
order_delimiter
)
logger
.
debug
(
f
"Procedures cargados en Xcom: {procedures}"
)
logger
.
info
(
f
"Procedures cargados en Xcom: {procedures}"
)
transforms_per_file
=
[]
transforms_per_file
=
[]
conf
=
task
.
xcom_pull
(
task_ids
=
"VALIDATE_EXTRACTION"
,
key
=
"CONTROL-CONFIG"
)
conf
=
task
.
xcom_pull
(
task_ids
=
"VALIDATE_EXTRACTION"
,
key
=
"CONTROL-CONFIG"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment