Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
B
bcom-tp-etl-transformation-pipelines
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
general
bcom-tp-etl-transformation-pipelines
Commits
26bf8a24
Commit
26bf8a24
authored
Aug 04, 2023
by
Cristian Aguirre
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Update 03-08-23. Update README
parent
e5ddb0fc
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
25 additions
and
20 deletions
+25
-20
README.docx
README.docx
+0
-0
README.md
README.md
+0
-1
Mysql.py
dags/components/Databases/Mysql.py
+1
-1
Postgres.py
dags/components/Databases/Postgres.py
+1
-1
Extractor.py
dags/components/Extractor.py
+1
-1
Utils.py
dags/components/Utils.py
+18
-12
airflow-envvars-configmap.yaml
deploy-k8/airflow-envvars-configmap.yaml
+1
-1
airflow-volumes.yaml
deploy-k8/airflow-volumes.yaml
+3
-3
No files found.
README.docx
0 → 100644
View file @
26bf8a24
File added
README.md
View file @
26bf8a24
...
...
@@ -51,7 +51,6 @@ NOTA: *Detalle de cada variable de entorno usada por los POD y Airflow:*
AIRFLOW__LOGGING__LOGGING_LEVEL: INFO # Nivel de log de Airflow (webserver, scheduler y workers)
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: America/Lima # Timezone de la Web de Airflow
AIRFLOW__CORE__DEFAULT_TIMEZONE: America/Lima # Timezone del Scheduler de Airflow
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": [60,60]}' # Tiempo de espera de respuesta de Kubernetes API
AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: cristianfernando/airflow_custom # Ruta de imagen a usar para crear el worker POD
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "0.0.1" # Tag a usar para crear el worker POD
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: airflow-logs-pvc # Nombre del volumen usado para almacenar los logs
...
...
dags/components/Databases/Mysql.py
View file @
26bf8a24
...
...
@@ -33,7 +33,7 @@ class Mysql:
self
.
connection
=
pymysql
.
connect
(
host
=
self
.
host
,
port
=
self
.
port
,
user
=
self
.
user
,
password
=
self
.
password
,
db
=
self
.
database
)
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo conexion básica de
Oracle
. {e}"
)
logger
.
error
(
f
"Error obteniendo conexion básica de
Mysql
. {e}"
)
finally
:
return
self
.
connection
...
...
dags/components/Databases/Postgres.py
View file @
26bf8a24
...
...
@@ -34,7 +34,7 @@ class Postgres:
password
=
self
.
password
,
database
=
self
.
database
,
options
=
"-c search_path="
+
self
.
schema
)
except
Exception
as
e
:
logger
.
error
(
f
"Error obteniendo conexion básica de
Oracle
. {e}"
)
logger
.
error
(
f
"Error obteniendo conexion básica de
Postgres
. {e}"
)
finally
:
return
self
.
connection
...
...
dags/components/Extractor.py
View file @
26bf8a24
...
...
@@ -107,7 +107,7 @@ def extract_from_source(command, source_conn, intern_conn, chunksize: int, **kwa
indexes
=
[]
if
"indexes"
in
procedure
.
keys
():
indexes
=
procedure
[
"indexes"
]
model
=
generateModel
(
tablename
,
procedure
[
"fields"
],
indexes
)
model
=
generateModel
(
tablename
,
procedure
[
"fields"
],
indexes
,
intern_conn
.
db_type
)
columns_name
=
[
field
[
"name"
]
for
field
in
procedure
[
"fields"
]]
if
isinstance
(
model
,
type
(
None
)):
raise
AssertionError
(
f
"Definición del extracción para {tablename} en el json-descriptor no encontraddo"
)
...
...
dags/components/Utils.py
View file @
26bf8a24
...
...
@@ -9,7 +9,8 @@ from enums.CatalogConfigurationEnum import CatalogConfigurationEnum
from
enums.FileTypeEnum
import
FileTypeEnum
from
enums.DataTypeEnum
import
DataTypeEnum
from
enums.DataTypeOrmEnum
import
DataTypeOrmEnum
from
components.Model.InsumoModel
import
InsumoModel
from
enums.DatabaseTypeEnum
import
DatabaseTypeEnum
from
components.Model.InsumoModel
import
InsumoModel
,
InsumoModelOracle
from
enums.CommentsScriptEnum
import
CommentsScriptEnum
from
components.Timezone
import
datetime_by_tzone
...
...
@@ -148,14 +149,14 @@ def select_multiple(command: str) -> Dict[str, Any]:
if
command
.
lower
()
.
replace
(
" "
,
""
)
.
find
(
no_procedure_init
)
!=
-
1
:
response
[
"is_multiple"
]
=
True
tablename
=
command
[:
command
.
index
(
"|"
)]
.
strip
()
init_index
=
command
.
lower
()
.
find
(
"from"
)
if
init_index
==
-
1
:
raise
AssertionError
(
"Query malformed"
)
else
:
from_command
=
command
[
init_index
+
4
:]
tablename_base
=
from_command
.
strip
()
.
split
(
" "
)
if
len
(
tablename_base
)
>
0
and
tablename
==
""
:
tablename
=
tablename_base
[
0
]
#
init_index = command.lower().find("from")
#
if init_index == -1:
#
raise AssertionError("Query malformed")
#
else:
#
from_command = command[init_index + 4:]
#
tablename_base = from_command.strip().split(" ")
#
if len(tablename_base) > 0 and tablename == "":
#
tablename = tablename_base[0]
response
[
"tablename"
]
=
tablename
except
Exception
as
e
:
raise
AssertionError
(
f
"Error validando si es múltiple select y nombre de tabla. {e}"
)
...
...
@@ -198,12 +199,17 @@ def delete_temp_dir(module_name: str) -> bool:
return
drop
def
generateModel
(
tablename
:
str
,
attributes
:
List
[
Dict
[
str
,
Any
]],
indexes
:
List
[
str
],
modelName
:
str
=
"TableModel"
):
def
generateModel
(
tablename
:
str
,
attributes
:
List
[
Dict
[
str
,
Any
]],
indexes
:
List
[
str
],
db_target
:
str
,
modelName
:
str
=
"TableModel"
):
default_precision
=
8
model
=
type
(
modelName
,
(
InsumoModel
,),
{
model
_args
=
{
'__tablename__'
:
tablename
,
'__table_args__'
:
{
'extend_existing'
:
True
}
})
}
if
db_target
==
DatabaseTypeEnum
.
ORACLE
.
value
:
model
=
type
(
modelName
,
(
InsumoModelOracle
,),
model_args
)
else
:
model
=
type
(
modelName
,
(
InsumoModel
,),
model_args
)
try
:
for
attribute
in
attributes
:
index
=
False
...
...
deploy-k8/airflow-envvars-configmap.yaml
View file @
26bf8a24
...
...
@@ -93,7 +93,7 @@ data:
_AIRFLOW_WWW_USER_USERNAME
:
admin
_AIRFLOW_WWW_USER_PASSWORD
:
admin
S3_DAGS_DIR
:
'
s3://prueba1234568/dags'
GCS_DAGS_DIR
:
'
gs://prueba-rsync
3
/carpeta'
GCS_DAGS_DIR
:
'
gs://prueba-rsync
2
/carpeta'
SYNCHRONYZE_DAG_DIR
:
'
30'
MINIO_SERVER
:
'
http://192.168.49.2:9000'
MINIO_DAGS_DIR
:
'
/prueba-ca/dags'
\ No newline at end of file
deploy-k8/airflow-volumes.yaml
View file @
26bf8a24
...
...
@@ -10,7 +10,7 @@ spec:
-
ReadWriteMany
storageClassName
:
airflow-dags
nfs
:
server
:
10.
115.7.82
server
:
10.
216.137.186
path
:
"
/volume1/nfs_share"
---
...
...
@@ -27,7 +27,7 @@ spec:
-
ReadWriteMany
storageClassName
:
airflow-postgres
nfs
:
server
:
10.
115.7.82
server
:
10.
216.137.186
path
:
"
/volume1/nfs_postgres"
---
...
...
@@ -44,7 +44,7 @@ spec:
-
ReadWriteMany
storageClassName
:
airflow-logs
nfs
:
server
:
10.
115.7.82
server
:
10.
216.137.186
path
:
"
/volume1/nfs_logs"
---
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment