Commit 64704c36 authored by criq's avatar criq

Initial commit

parents
Pipeline #132 canceled with stages
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
tmp/
folder/
registro.json
.vscode/
\ No newline at end of file
# Script para BK de archivos desde un Bucket S3
## Requisitos del SO
- Instalar Python en su version minima de 3.7
- Tener mapeado el interprete en el path como: python3 (Alias)
- Tener mapeado el administrador de paquetes de python como: pip3 (Alias)
- Tener instalado la libreria para entornos virtuales: "virtualenv", si no se tiene, instalarla de la siguiente manera:
pip3 install virtualenv
## Creación de entorno virtual
```sh
python3 -m virtualenv bk_script_env
source bk_script_env/bin/activate
pip3 install -r s3_local_manager/requirements.txt
```
## Configuración
- Establecere las credenciales de AWS (Key, Secret y Region) como variables de entorno del SO (se utiliza esta forma por seguridad). Por ejemplo:
export ACCESS_KEY_ID=AKIAWF7U5VCKY4BSCM5P
export SECRET_ACCESS_KEY=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
export AWS_DEFAULT_REGION=us-east-1
- Configurar las carpetas en el archivo s3_project/config.py (Seccion: # Carpetas)
## Ejecución
```sh
python3 -m s3_project
```
# python=3.7
boto3
python-dotenv
s3path
coloredlogs
\ No newline at end of file
print('get __main__')
from s3_local_manager import app
if __name__ == '__main__':
app.run()
\ No newline at end of file
import os
from dotenv import load_dotenv
from .src.utils_aws_s3.s3_helper import AwsS3Helper
from .src.utils_local_os.local_helper import LocalFilesHelper
from .src.util_registry.util_registry import registry
import s3_local_manager.config as config
import coloredlogs, logging
import time
import datetime
logger = logging.getLogger(__name__)
coloredlogs.install(level='DEBUG', logger=logger)
coloredlogs.install(level='success', logger=logger)
rg = registry()
aws_access_key_id = os.getenv('ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('SECRET_ACCESS_KEY')
aws_region = os.getenv('AWS_DEFAULT_REGION')
if aws_access_key_id and aws_secret_access_key and aws_region:
logger.info('************** Leyendo Credenciales **************')
else:
logger.error('No se ha configurado las variables de entorno.')
exit(0)
#------------------------ MAIN -----------------------------------------
## Run function
def run():
start_time = datetime.datetime.now()
tic = time.time()
try:
s3Help = AwsS3Helper(aws_access_key_id,aws_secret_access_key,aws_region)
# Obtener archivos del bucket ordenado
# por defecto el primer elemento es el más antiguo
lista_s3 = s3Help.sorted_list_keys(config.S3_BUCKET)
print(lista_s3)
# Obtener la fecha del ultimo archivo procesado
last_datetime = rg.get_last_datetime_file_process()
logger.info(' Fecha del último archivo procesado : '+str(last_datetime))
## Filtrar lista por la fecha anterior
filter_list_s3 = s3Help.filter_by_datetime(lista_s3,last_datetime)
if len(filter_list_s3) == 0 and last_datetime != None:
logger.warning(' No hay nuevos archivos a procesar ... ')
elif last_datetime == None:
filter_list_s3 = lista_s3
print('**** ')
# Clase para gestionar carpetas locales
locaTemplHelper = LocalFilesHelper(config.local_folder_temp)
locaDefinitivelHelper = LocalFilesHelper(config.local_folder_definitive)
# iterar archivos
# guardar el último
last_file = None
for s3_file in filter_list_s3:
local_dest_file = locaTemplHelper.create_pathfile_for_dict(s3_file)
if local_dest_file:
# Retorna True si ya esta registrado
is_registred = rg.registry_file_dict(s3_file)
if not is_registred:
s3Help.download_file_from_dict(s3_file,local_dest_file)
print(local_dest_file)
# guardar en el otro directorio tambien
locaDefinitivelHelper.move_file(local_dest_file)
last_file = s3_file
print('_'*60)
toc = time.time()
rg.save_process_time(start_time,tic,toc,last_file)
except Exception as e:
logger.error('Existe un problema en el proceso: %s', e)
logger.info('************** Proceso Completado **************')
# Bucket
S3_BUCKET = 'buckethdi/archivos' #'my-bucket-sagemaker'
# Carpetas
local_folder_temp = '/tmp'
local_folder_definitive = '/Users/robmac/Downloads/tmp2'
# Registro de Control
json_registry_path = 'registro.json'
# formato de fecha a usar
datetime_format_string = '%d/%m/%Y %H:%M:%S'
aws_timezone = 'America/Lima'
\ No newline at end of file
import os
import json
import errno
import s3_local_manager.config as config
import coloredlogs, logging
logger = logging.getLogger(__name__)
coloredlogs.install(level='DEBUG', logger=logger)
class registry:
json_path = None
data = None
index = 0
def __init__(self):
if self.file_exists(config.json_registry_path):
self.json_path = config.json_registry_path
try:
with open(self.json_path) as json_file:
self.data = json.load(json_file)
self.index = len(self.data['history'])
self.data['history'].append({"process": {"files":[]}})
except:
with open(self.json_path) as json_file:
self.data = {"history":[{"process": {"files":[]}}]}
else:
# primera vez en caso no exista
self.json_path = config.json_registry_path
self.data = {"history":[{"process": {"files":[]}}]}
self.save_data()
def file_exists(self,path='registry.json'):
return os.path.exists(path)
def save_data(self):
# Verficar si hay objetos en el nuevo proceso
processed_files = len(self.data['history'][-1]['process']['files'])
if processed_files == 0:
return None
try:
with open(self.json_path, 'w') as f:
json.dump(self.data, f, indent=4)
except PermissionError as e:
logger.error(e)
except OSError as e:
if e.errno == errno.ENOSPC:
logging.error(' No hay espacio suficiente en el disco.')
else:
logging.error(e)
except Exception as e:
logger.error(e)
def get_last_datetime_file_process(self):
if self.index == 0:
return None
last_datetime = self.data['history'][self.index-1]['process']['last_file_process']['info']['LastModified']
return last_datetime
def save_process(self,dict_process):
'''
self.data = { "history": [ process ... ] }
'''
data = self.data
data['history'].append(dict_process)
self.save_data()
def get_processed_files(self):
'''
self.data = { "history": [ "process": { ... "files": [ files ... ] } ] }
'''
data = self.data
p_files = []
try:
for process in data['history']:
for file in process['process']['files']:
p_files.append(file)
return p_files
except Exception as e:
logger.warning(e)
return []
def find_file(self, filename):
'''
Busca archivo en el registro
Return : None si no existe, el s3_file si existe
get_processed_files = [ {"name":" a.jpg"} ]
'''
## Obtener archivos procesados
procesed_files = self.get_processed_files()
# buscar el archivo
rfile = None
for file in procesed_files:
if filename == file['name']:
return file
return rfile
def is_processed(self,file_dict):
s3_file = file_dict
## Obtener archivos procesados
procesed_files = self.get_processed_files()
# Verificar si el archivo nuevo coincide con el registrado
flag = False
for p_file in procesed_files:
if p_file['pathS3'] == s3_file['S3path'] and p_file['datetime_modification'] == s3_file['info']['LastModified'].strftime(config.datetime_format_string):
flag = True
if flag:
return True
else:
return False
def get_name_from_dict(self,dict_file):
if '/' in dict_file['S3path']:
return dict_file['S3path'].split('/')[-1]
def registry_file_dict(self,file_dict):
"""
file_dict = {'S3path': 'my-bucket-sagemaker/OverallSystemUsage.json',
'info': {'Key': 'sagemaker/OverallSystemUsage.json',
'LastModified': datetime.datetime(2021, 3, 29, 3, 36, 35, tzinfo=tzutc()),
'ETag': '"04a7284801e90eb4a69633a80b3610d0"',
'Size': 464, 'StorageClass': 'STANDARD',
'Owner': {'DisplayName': 'cquezada', 'ID': 'abb6d7141fbd42f63b986da5e37b7c6379a8cb7b156beea8c5270125cbe10ce0'}}}
"""
# si ya ha sido procesado
if self.is_processed(file_dict):
# Comprobar si la fechas de modificacion coinciden
#self.data['history'][self.index]['process']['files'].append(file)
return True
else:
# Si no esta en el registro
# Agregarlo
file = self.formatDict(file_dict)
print('file to save : ',file)
self.data['history'][self.index]['process']['files'].append(file)
self.save_data()
return False
def formatDict(self,file_dict):
"""
INPUT = {'S3path': 'my-bucket-sagemaker/OverallSystemUsage.json',
'info': {'Key': 'sagemaker/OverallSystemUsage.json',
'LastModified': datetime.datetime(2021, 3, 29, 3, 36, 35, tzinfo=tzutc()),
'ETag': '"04a7284801e90eb4a69633a80b3610d0"',
'Size': 464, 'StorageClass': 'STANDARD',
'Owner': {'DisplayName': 'cquezada', 'ID': 'abb6d7141fbd42f63b986da5e37b7c6379a8cb7b156beea8c5270125cbe10ce0'}}}
OUTPUT: {"name":"1.json","datetime_modification":"25-06-2021 15:32","pathS3":"1.json"}
"""
name = self.get_name_from_dict(file_dict)
datetime_modification = file_dict['info']['LastModified'].strftime(config.datetime_format_string)
pathS3 = file_dict['S3path']
return {"name":name,"datetime_modification":datetime_modification,"pathS3":pathS3}
def save_process_time(self, start_time,tic,toc, last_file):
mseconds = toc - tic
self.data['history'][self.index]['process']['time_start'] = start_time.strftime(config.datetime_format_string)
self.data['history'][self.index]['process']['seconds'] = str(toc-tic)
self.data['history'][self.index]['process']['size'] = len(self.data['history'][self.index]['process']['files'])
self.data['history'][self.index]['process']['bucket'] = config.S3_BUCKET
## Last index
# format datetime
if last_file:
last_file['info']['LastModified'] = last_file['info']['LastModified'].strftime(config.datetime_format_string)
self.data['history'][self.index]['process']['last_file_process'] = last_file
self.save_data()
# Obtener el último archivo por procesar
#!/usr/bin/env python
import os
import errno
import boto3
from botocore.exceptions import EndpointConnectionError,ClientError # type: ignore
import logging
from datetime import datetime, timezone
from dateutil import parser
import s3_local_manager.config as config
class AwsS3Helper:
def __init__(self,AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,region_name="us-east-1"):
'''
Method to programatically access S3 objects.
PARAMETERS:
AWS_ACCESS_KEY_ID (string): Access Key associated with an AWS account.
AWS_SECRET_ACCESS_KEY (string): Corresponding Secret Key associated to the account.
(OPTIONAL) region_name (string): Region for the account. Set default to 'us-east-1'.
'''
self.conn = boto3.client("s3", region_name = region_name, aws_access_key_id = AWS_ACCESS_KEY_ID, \
aws_secret_access_key = AWS_SECRET_ACCESS_KEY)
def set_key(self, path):
'''
Utility function to get bucket and key from a S3 path
PARAMETERS:
path (string): s3 path
OUTPUT:
bucket (string): bucket in which the s3 path is
key (string): s3 path without the bucket.
USAGE:
set_key('s3_bucket/s3_path/file.txt')
'''
bucket = path.split("/")[0]
key = "/".join(path.split("/")[1:])
return bucket, key
def upload_file(self, path, inpfname):
'''
To upload a S3 object
PARAMETERS:
path (string): s3 path where file to be uploaded. Should be specified as bucket + key.
inpfname (string): local path of file to be uploaded
RETURNS: None
USAGE:
upload_file('s3_bucket/s3_path/file.txt','/home/abc/local_dir/file.txt')
'''
bucket, key = self.set_key(path)
self.conn.upload_file(inpfname, bucket, key)
def download_file(self, path, outfname):
'''
To download a S3 object.
PARAMETERS:
path (string): s3 object to be downloaded. Should be specified as bucket + key.
inpfname (string): local path of where the object is to be downloaded.
RETURNS: None
USAGE:
download_file('s3_bucket/s3_path/file.txt','/home/abc/local_dir/file.txt')
'''
try:
bucket, key = self.set_key(path)
self.conn.download_file(bucket, key, outfname)
except ClientError as e:
logging.error(e)
except OSError as e:
if e.errno == errno.ENOSPC:
logging.error(' No hay espacio suficiente en el disco.')
else:
logging.error(e)
except Exception as e:
print('Exeption: ',e)
def download_file_from_dict(self, file_dict, outfname):
path = file_dict['S3path']
return self.download_file(path, outfname)
def del_file(self, path):
'''
To delete a s3 object.
PARAMETERS:
path (string): s3 path of object to be deleted.
RETURNS: None
USAGE:
del_file('s3_bucket/s3_path/file.txt')
'''
bucket, key = self.set_key(path)
self.conn.delete_object(Bucket=bucket,Key=key)
def list_keys(self, prefix_name):
'''
Used to list files in a S3 object/folder.
PARAMETERS:
prefix_name (string): s3 path of folder/object
OUPTUT:
A dict containing s3 paths in "S3path" key .
USAGE:
list_keys('s3_bucket/s3_path/')
'''
bucket, key = self.set_key(prefix_name)
is_truncated = True
result = []
marker = ""
while is_truncated:
response = self.conn.list_objects(Bucket=bucket,Prefix=key,Delimiter=",",Marker=marker)
if "Contents" not in response: break
result.extend(response["Contents"])
is_truncated = response["IsTruncated"]
if "NextMarker" not in response: break
marker = response["NextMarker"]
return map(lambda x:{"S3path":bucket + "/" + x["Key"],"info":x}, result)
def sorted_list_keys(self, prefix_name, reverse=False):
'''
PARAMETERS:
prefix_name (string): s3 path of folder/object
OUPTUT:
A dict containing s3 paths in "S3path" key .
USAGE:
sorted_list_keys('s3_bucket/s3_path/')
'''
map_list = list(self.list_keys(prefix_name))
# Change UTC ZONE TO LOCAL
utc_map_list = []
for obj in map_list :
obj['info']['LastModified'] = self.utc_to_local(obj['info']['LastModified'])
utc_map_list.append(obj)
map_list = utc_map_list
get_last_modified = lambda obj: int(obj['info']['LastModified'].strftime('%s'))
return sorted(map_list, key=get_last_modified, reverse=reverse)
def utc_to_local(self,utc_dt):
return utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=None)
def aslocaltimestr(self,utc_dt):
return self.utc_to_local(utc_dt).strftime(config.datetime_format_string)
def filter_by_datetime(self, list_keys, last_datetime):
'''
PARAMETERS:
prefix_name (string): s3 path of folder/object
OUPTUT:
A dict containing s3 paths in "S3path" key .
USAGE:
Retorna una lista con los archivos posteriores
a la fecha indicada
'''
filter_list = []
if last_datetime:
last_datetime_dt = parser.parse(last_datetime)
for obj in list_keys:
if obj['info']['LastModified'].replace(tzinfo=None) > last_datetime_dt:
filter_list.append(obj)
return filter_list
def copy_key(self, src_key, dst_key):
'''
Used to copy file from one s3 path to another
PARAMETERS:
src_key (string): s3 path from where file to be copied
dst_key (string): s3 path where file to be copied
OUTPUT: None
USAGE:
copy_key('s3_bucket1/some_path/fname.txt','s3_bucket2/some_path2/fname2.txt')
'''
bucket ,key = self.set_key(dst_key)
self.conn.copy_object(Bucket=bucket,CopySource=src_key,Key=key)
def get_last_modified(self, path):
'''
To get last modified timestamp of an object
PARAMETERS:
path (string): s3 path of object
RETURNS: Python datetime object
USAGE:
get_last_modified('s3_bucket/s3_path/file.txt')
'''
bucket, key = self.set_key(path)
object_info = self.resource.Object(bucket, key)
timestamp = object_info.last_modified
return timestamp
def clear_s3_folder(self,s3_path):
'''
Utility function to delete a s3 path and it's subpaths.
PARAMETERS:
s3_path (string): s3 path to be cleared.
OUPTUT: None
USAGE:
clear_s3_folder('s3_bucket/some_path/')
'''
file_keys = self.conn_s3.list_keys(s3_path)
for fkey in file_keys:
self.conn_s3.del_file(fkey)
#!/usr/bin/env python
import os
import shutil
import coloredlogs, logging
logger = logging.getLogger(__name__)
coloredlogs.install(level='DEBUG', logger=logger)
class LocalFilesHelper:
def __init__(self,directory_path):
self.dirname = directory_path
if not os.path.exists(directory_path):
os.makedirs(directory_path)
def get_files_from_directory(self):
'''
Utility function to get bucket and key from a S3 path
PARAMETERS:
path (string): local path
OUTPUT:
bucket (string): bucket in which the s3 path is
key (string): s3 path without the bucket.
USAGE:
list_keys('s3_bucket/s3_path/file.txt')
'''
# Get list of files
filepaths = []
for basename in os.listdir(self.dirname):
filename = os.path.join(self.dirname, basename)
if os.path.isfile(filename):
filepaths.append(filename)
return filepaths
def create_dest_path_for_file(self,source_file_path):
if '/' in source_file_path :
file_name = source_file_path.split('/')[-1]
if '.' in file_name:
return self.dirname + '/' + file_name
else:
logger.warning('Skip folder: '+str(source_file_path))
return None
else:
logging.error('bad file: ',source_file_path)
return None
def create_pathfile_for_dict(self,source_file_dict):
filepath = source_file_dict['S3path']
return self.create_dest_path_for_file(filepath)
def create_subdirectory_for_file(self,file_path):
string_split = file_path.split('/')[:-1]
subdirectory_path = '/'.join(string_split)
if not os.path.exists(subdirectory_path):
os.makedirs(subdirectory_path)
def replace_main_foldername(self,path):
if '/' in path:
new_path = path.split('/')
new_path[0] = self.dirname
return '/'.join(new_path)
def move_file(self,source_file):
'''
Utility function to ove files to this folder
PARAMETERS:
path (string): local path
OUTPUT:
USAGE:
move_file('s3_bucket/s3_path/file.txt')
'''
if '/' in source_file:
file_name = source_file.split('/')[-1]
dest_file = self.dirname +'/' + file_name
try:
shutil.move(source_file, dest_file)
except Exception as e:
print(e)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment