Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
A
aws_mlops
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
courses
aws_mlops
Commits
ec3766ed
Commit
ec3766ed
authored
Aug 25, 2021
by
Cristian Quezada
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Initial commit
parent
c36a7082
Pipeline
#168
canceled with stages
Changes
5
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
495 additions
and
0 deletions
+495
-0
cfnresponse.py
code/lambdas/mlops-env-setup/cfnresponse.py
+51
-0
index.py
code/lambdas/mlops-env-setup/index.py
+28
-0
mlops_op_deploy.py
code/lambdas/mlops-op-deployment/mlops_op_deploy.py
+262
-0
index.py
code/lambdas/mlops-op-process-request/index.py
+67
-0
index.py
code/lambdas/mlops-op-training/index.py
+87
-0
No files found.
code/lambdas/mlops-env-setup/cfnresponse.py
0 → 100644
View file @
ec3766ed
# Copyright 2016 Amazon Web Services, Inc. or its affiliates. All Rights Reserved.
# This file is licensed to you under the AWS Customer Agreement (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at http://aws.amazon.com/agreement/ .
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from
__future__
import
print_function
import
urllib3
import
json
SUCCESS
=
"SUCCESS"
FAILED
=
"FAILED"
http
=
urllib3
.
PoolManager
()
def
send
(
event
,
context
,
responseStatus
,
responseData
,
physicalResourceId
=
None
,
noEcho
=
False
,
reason
=
None
):
responseUrl
=
event
[
'ResponseURL'
]
print
(
responseUrl
)
responseBody
=
{
'Status'
:
responseStatus
,
'Reason'
:
reason
or
"See the details in CloudWatch Log Stream: {}"
.
format
(
context
.
log_stream_name
),
'PhysicalResourceId'
:
physicalResourceId
or
context
.
log_stream_name
,
'StackId'
:
event
[
'StackId'
],
'RequestId'
:
event
[
'RequestId'
],
'LogicalResourceId'
:
event
[
'LogicalResourceId'
],
'NoEcho'
:
noEcho
,
'Data'
:
responseData
}
json_responseBody
=
json
.
dumps
(
responseBody
)
print
(
"Response body:"
)
print
(
json_responseBody
)
headers
=
{
'content-type'
:
''
,
'content-length'
:
str
(
len
(
json_responseBody
))
}
try
:
response
=
http
.
request
(
'PUT'
,
responseUrl
,
headers
=
headers
,
body
=
json_responseBody
)
print
(
"Status code:"
,
response
.
status
)
except
Exception
as
e
:
print
(
"send(..) failed executing http.request(..):"
,
e
)
\ No newline at end of file
code/lambdas/mlops-env-setup/index.py
0 → 100644
View file @
ec3766ed
import
cfnresponse
import
boto3
codeCommit
=
boto3
.
client
(
'codecommit'
)
s3
=
boto3
.
resource
(
's3'
)
ecr
=
boto3
.
client
(
'ecr'
)
def
lambda_handler
(
event
,
context
):
responseData
=
{
'status'
:
'NONE'
}
if
event
[
'RequestType'
]
==
'Create'
:
repoName
=
event
[
'ResourceProperties'
]
.
get
(
'RepoName'
)
branch_names
=
event
[
'ResourceProperties'
]
.
get
(
'BranchNames'
)
branches
=
codeCommit
.
list_branches
(
repositoryName
=
repoName
)[
'branches'
]
responseData
[
'default_branch'
]
=
branch_names
[
0
]
if
len
(
branches
)
==
0
:
putFiles
=
{
'filePath'
:
'buildspec.yml'
,
'fileContent'
:
"version: 0.2
\n
phases:
\n
build:
\n
commands:
\n
- echo 'dummy'
\n
"
.
encode
()}
resp
=
codeCommit
.
create_commit
(
repositoryName
=
repoName
,
branchName
=
'master'
,
commitMessage
=
' - repo init'
,
putFiles
=
[
putFiles
])
for
i
in
branch_names
:
codeCommit
.
create_branch
(
repositoryName
=
repoName
,
branchName
=
i
,
commitId
=
resp
[
'commitId'
])
responseData
[
'status'
]
=
'CREATED'
elif
event
[
'RequestType'
]
==
'Delete'
:
s3
.
Bucket
(
event
[
'ResourceProperties'
]
.
get
(
'BucketName'
)
)
.
object_versions
.
all
()
.
delete
()
try
:
for
i
in
event
[
'ResourceProperties'
]
.
get
(
'ImageRepoNames'
):
imgs
=
ecr
.
list_images
(
registryId
=
'894268508623'
,
repositoryName
=
i
)
ecr
.
batch_delete_image
(
registryId
=
'894268508623'
,
repositoryName
=
i
,
imageIds
=
imgs
[
'imageIds'
])
except
Exception
as
e
:
pass
responseData
[
'status'
]
=
'DELETED'
cfnresponse
.
send
(
event
,
context
,
cfnresponse
.
SUCCESS
,
responseData
)
code/lambdas/mlops-op-deployment/mlops_op_deploy.py
0 → 100644
View file @
ec3766ed
import
boto3
import
io
import
zipfile
import
json
import
logging
from
crhelper
import
CfnResource
from
botocore.exceptions
import
ClientError
logger
=
logging
.
getLogger
(
__name__
)
# Initialise the helper, all inputs are optional, this example shows the defaults
helper
=
CfnResource
(
json_logging
=
False
,
log_level
=
'DEBUG'
,
boto_level
=
'CRITICAL'
)
s3
=
boto3
.
client
(
's3'
)
sm
=
boto3
.
client
(
'sagemaker'
)
auto
=
boto3
.
client
(
'application-autoscaling'
)
cfn
=
boto3
.
client
(
'cloudformation'
)
def
lambda_handler
(
event
,
context
):
helper
(
event
,
context
)
def
create_autoscaling_policy
(
event
,
context
):
endpoint_name
=
helper
.
Data
.
get
(
'endpoint_name'
)
variant_names
=
helper
.
Data
.
get
(
'variant_names'
)
meta
=
helper
.
Data
.
get
(
'deployment_metadata'
)
role_arn
=
helper
.
Data
.
get
(
'role_arn'
)
for
variant_name
in
variant_names
:
resourceId
=
'endpoint/
%
s/variant/
%
s'
%
(
endpoint_name
,
variant_name
)
resp
=
auto
.
register_scalable_target
(
ServiceNamespace
=
'sagemaker'
,
ResourceId
=
resourceId
,
ScalableDimension
=
'sagemaker:variant:DesiredInstanceCount'
,
MinCapacity
=
meta
[
'AutoScaling'
][
'MinCapacity'
],
MaxCapacity
=
meta
[
'AutoScaling'
][
'MaxCapacity'
],
RoleARN
=
role_arn
)
resp
=
auto
.
put_scaling_policy
(
PolicyName
=
'
%
s-
%
s'
%
(
endpoint_name
,
variant_name
),
PolicyType
=
'TargetTrackingScaling'
,
ResourceId
=
resourceId
,
ScalableDimension
=
'sagemaker:variant:DesiredInstanceCount'
,
ServiceNamespace
=
'sagemaker'
,
TargetTrackingScalingPolicyConfiguration
=
{
'TargetValue'
:
meta
[
'AutoScaling'
][
'TargetValue'
],
'ScaleInCooldown'
:
meta
[
'AutoScaling'
][
'ScaleInCooldown'
],
'ScaleOutCooldown'
:
meta
[
'AutoScaling'
][
'ScaleOutCooldown'
],
'PredefinedMetricSpecification'
:
{
'PredefinedMetricType'
:
meta
[
'AutoScaling'
][
'PredefinedMetricType'
]
}
}
)
def
prepare_descriptors
(
event
,
context
):
deployment_info
=
None
env
=
event
[
'ResourceProperties'
][
'Environment'
]
job_name
=
event
[
'ResourceProperties'
][
'JobName'
]
job_description
=
sm
.
describe_training_job
(
TrainingJobName
=
job_name
)
if
not
env
in
[
"development"
,
"production"
]:
raise
Exception
(
"Invalid deployment environment:
%
s"
%
env
)
resp
=
s3
.
get_object
(
Bucket
=
event
[
'ResourceProperties'
][
'AssetsBucket'
],
Key
=
event
[
'ResourceProperties'
][
'AssetsKey'
])
with
zipfile
.
ZipFile
(
io
.
BytesIO
(
resp
[
'Body'
]
.
read
()),
"r"
)
as
z
:
deployment_info
=
json
.
loads
(
z
.
read
(
'deployment.json'
)
.
decode
(
'ascii'
))
metadata
=
deployment_info
[
'DevelopmentEndpoint'
]
if
env
==
'development'
else
deployment_info
[
'ProductionEndpoint'
]
# Now create the Endpoint Configuration
endpoint_name
=
"
%
s-
%
s"
%
(
deployment_info
[
'EndpointPrefix'
],
env
)
endpoint_config_name
=
"
%
s-ec-
%
s-
%
s"
%
(
deployment_info
[
'EndpointPrefix'
],
job_name
,
env
)
endpoint_config_params
=
{
'EndpointConfigName'
:
endpoint_config_name
}
endpoint_params
=
{
'EndpointName'
:
endpoint_name
,
'EndpointConfigName'
:
endpoint_config_name
}
endpoint_config_params
[
'ProductionVariants'
]
=
[{
'VariantName'
:
'model-a'
,
'ModelName'
:
job_name
,
'InitialInstanceCount'
:
metadata
[
"InitialInstanceCount"
],
'InstanceType'
:
metadata
[
"InstanceType"
]
}]
variant_names
=
[
'model-a'
]
# here we check if there is already a variant in the endpoint
# we need to rearange the varia nts to do A/B tests
if
metadata
[
'ABTests'
]:
try
:
resp
=
sm
.
describe_endpoint
(
EndpointName
=
endpoint_name
)
logger
.
info
(
"Endpoint config name:
%
s"
,
resp
[
'EndpointConfigName'
])
resp
=
sm
.
describe_endpoint_config
(
EndpointConfigName
=
resp
[
'EndpointConfigName'
])
old_variant
=
resp
[
'ProductionVariants'
][
0
]
old_variant
[
'InitialVariantWeight'
]
=
1.0
-
metadata
[
'InitialVariantWeight'
]
new_variant
=
endpoint_config_params
[
'ProductionVariants'
][
0
]
new_variant
[
'VariantName'
]
=
"model-b"
if
old_variant
[
'VariantName'
]
.
endswith
(
'-a'
)
else
"model-a"
new_variant
[
'InitialVariantWeight'
]
=
metadata
[
'InitialVariantWeight'
]
endpoint_config_params
[
'ProductionVariants'
]
.
append
(
old_variant
)
variant_names
.
append
(
'model-b'
)
except
Exception
as
ex
:
logger
.
info
(
"Error while trying to retrieve the EndpointConfig. It means we'll have just one variant:
%
s"
,
ex
)
# here we enable the log writing if required
if
metadata
[
'InferenceMonitoring'
]:
endpoint_config_params
[
'DataCaptureConfig'
]
=
{
'EnableCapture'
:
True
,
'InitialSamplingPercentage'
:
metadata
[
'InferenceMonitoringSampling'
],
'DestinationS3Uri'
:
metadata
[
'InferenceMonitoringOutputBucket'
],
'CaptureOptions'
:
[{
'CaptureMode'
:
'Input'
},{
'CaptureMode'
:
'Output'
}],
'CaptureContentTypeHeader'
:
{
"CsvContentTypes"
:
[
"text/csv"
],
"JsonContentTypes"
:
[
"application/json"
]
}
}
# now, we create the model metadata
model_params
=
{
'ModelName'
:
job_name
,
'PrimaryContainer'
:
{
'Image'
:
job_description
[
'AlgorithmSpecification'
][
'TrainingImage'
],
'ModelDataUrl'
:
job_description
[
'ModelArtifacts'
][
'S3ModelArtifacts'
],
},
'ExecutionRoleArn'
:
job_description
[
'RoleArn'
]
}
helper
.
Data
.
update
({
'endpoint_name'
:
endpoint_name
,
'endpoint_config_name'
:
endpoint_config_name
,
'model_name'
:
job_name
,
'deployment_metadata'
:
metadata
,
'variant_names'
:
variant_names
,
'role_arn'
:
job_description
[
'RoleArn'
],
'enable_auto_scaling'
:
True
if
metadata
[
'AutoScaling'
]
else
False
})
return
model_params
,
endpoint_config_params
,
endpoint_params
@
helper
.
create
@
helper
.
update
def
start_deployment
(
event
,
context
):
try
:
model_params
,
endpoint_config_params
,
endpoint_params
=
prepare_descriptors
(
event
,
context
)
model_name
=
helper
.
Data
.
get
(
'model_name'
)
endpoint_name
=
helper
.
Data
.
get
(
'endpoint_name'
)
endpoint_config_name
=
helper
.
Data
.
get
(
'endpoint_config_name'
)
# and here we create all the three elements for a deploy
try
:
sm
.
describe_model
(
ModelName
=
model_name
)
except
ClientError
as
e
:
if
e
.
response
[
'Error'
][
'Code'
]
==
'ValidationException'
:
sm
.
create_model
(
**
model_params
)
sm
.
create_endpoint_config
(
**
endpoint_config_params
)
try
:
sm
.
describe_endpoint
(
EndpointName
=
endpoint_name
)
sm
.
update_endpoint
(
EndpointName
=
endpoint_name
,
EndpointConfigName
=
endpoint_config_name
,
RetainAllVariantProperties
=
False
)
logger
.
info
(
"Endpoint found. Updating:
%
s"
,
endpoint_name
)
except
ClientError
as
e
:
if
e
.
response
[
'Error'
][
'Code'
]
==
'ValidationException'
:
sm
.
create_endpoint
(
**
endpoint_params
)
logger
.
info
(
"Endpoint wasn't found. Creating:
%
s"
,
endpoint_name
)
except
Exception
as
e
:
logger
.
error
(
"start_deployment - Ops! Something went wrong:
%
s"
%
e
)
raise
e
@
helper
.
poll_create
@
helper
.
poll_update
def
check_deployment_progress
(
event
,
context
):
answer
=
False
try
:
endpoint_name
=
helper
.
Data
.
get
(
'endpoint_name'
)
enable_auto_scaling
=
helper
.
Data
.
get
(
'enable_auto_scaling'
)
resp
=
sm
.
describe_endpoint
(
EndpointName
=
endpoint_name
)
status
=
resp
[
'EndpointStatus'
]
if
status
in
[
'Creating'
,
'Updating'
]:
logger
.
info
(
"check_deployment_progress - Preparing endpoint
%
s, status:
%
s"
,
endpoint_name
,
status
)
elif
status
==
'InService'
:
logger
.
info
(
"check_deployment_progress - Endpoint
%
s ready to be used!"
,
endpoint_name
)
if
enable_auto_scaling
:
create_autoscaling_policy
(
event
,
context
)
answer
=
True
else
:
answer
=
True
raise
Exception
(
"Invalid state for endpoint
%
s:
%
s"
,
endpoint_name
,
resp
[
'FailureReason'
])
except
Exception
as
e
:
logger
.
error
(
"check_deployment_progress - Ops! Something went wrong:
%
s"
%
e
)
if
answer
:
raise
e
return
answer
@
helper
.
delete
def
delete_deployment
(
event
,
context
):
try
:
env
=
event
[
'ResourceProperties'
][
'Environment'
]
job_name
=
event
[
'ResourceProperties'
][
'JobName'
]
logical_id
=
event
[
'LogicalResourceId'
]
request_physical_id
=
event
[
'PhysicalResourceId'
]
if
not
env
in
[
"development"
,
"production"
]:
raise
Exception
(
"Invalid deployment environment:
%
s"
%
env
)
stack_name
=
event
[
'StackId'
]
.
split
(
'/'
)[
1
]
resp
=
cfn
.
describe_stack_resource
(
StackName
=
stack_name
,
LogicalResourceId
=
logical_id
)
current_physical_id
=
resp
[
'StackResourceDetail'
][
'PhysicalResourceId'
]
if
request_physical_id
!=
current_physical_id
:
logger
.
info
(
"delete_deployment - Delete request for resouce id:
%
s, but the current id is:
%
s. Ignoring..."
,
request_physical_id
,
current_physical_id
)
helper
.
Data
.
update
({
'delete_old_resource'
:
True
})
return
resp
=
s3
.
get_object
(
Bucket
=
event
[
'ResourceProperties'
][
'AssetsBucket'
],
Key
=
event
[
'ResourceProperties'
][
'AssetsKey'
])
with
zipfile
.
ZipFile
(
io
.
BytesIO
(
resp
[
'Body'
]
.
read
()),
"r"
)
as
z
:
deployment_info
=
json
.
loads
(
z
.
read
(
'deployment.json'
)
.
decode
(
'ascii'
))
endpoint_name
=
"
%
s-
%
s"
%
(
deployment_info
[
'EndpointPrefix'
],
env
)
endpoint_config_name
=
"
%
s-ec-
%
s-
%
s"
%
(
deployment_info
[
'EndpointPrefix'
],
job_name
,
env
)
helper
.
Data
.
update
({
'endpoint_name'
:
endpoint_name
,
'endpoint_config_name'
:
endpoint_config_name
})
resp
=
sm
.
describe_endpoint
(
EndpointName
=
endpoint_name
)
status
=
resp
[
'EndpointStatus'
]
if
status
!=
'InService'
:
raise
Exception
(
"You can't delete an endpoint that is not InService:
%
s, status[
%
s]"
%
(
endpoint_name
,
status
))
else
:
sm
.
delete_endpoint
(
EndpointName
=
endpoint_name
)
except
ClientError
as
e
:
if
e
.
response
[
'Error'
][
'Code'
]
==
'ValidationException'
:
logger
.
info
(
"Well, there is no endpoint to delete:
%
s -
%
s"
,
endpoint_name
,
e
)
except
Exception
as
e
:
logger
.
error
(
"delete_deployment - Ops! Something went wrong:
%
s"
%
e
)
raise
e
@
helper
.
poll_delete
def
check_delete_deployment_progress
(
event
,
context
):
endpoint_name
=
None
endpoint_config_name
=
None
try
:
delete_old_resource
=
helper
.
Data
.
get
(
'delete_old_resource'
)
if
delete_old_resource
:
logger
.
info
(
"check_delete_deployment_progress - Nothing to do... ignoring"
)
return
True
endpoint_name
=
helper
.
Data
.
get
(
'endpoint_name'
)
endpoint_config_name
=
helper
.
Data
.
get
(
'endpoint_config_name'
)
resp
=
sm
.
describe_endpoint
(
EndpointName
=
endpoint_name
)
status
=
resp
[
'EndpointStatus'
]
if
status
!=
'Deleting'
:
raise
Exception
(
'Error while trying to delete the endpoint:
%
s. Status:
%
s'
%
(
endpoint_name
,
status
)
)
else
:
return
False
except
ClientError
as
e
:
if
e
.
response
[
'Error'
][
'Code'
]
==
'ValidationException'
:
logger
.
info
(
"Finished! We deleted the endpoint. Now, let's delete the ec"
)
try
:
sm
.
delete_endpoint_config
(
EndpointConfigName
=
endpoint_config_name
)
except
ClientError
as
ex
:
if
e
.
response
[
'Error'
][
'Code'
]
==
'ValidationException'
:
logger
.
info
(
"The EC wasn't created before, so let's ignore"
)
else
:
raise
ex
except
Exception
as
e
:
logger
.
error
(
"check_delete_deployment_progress - Ops! Something went wrong:
%
s"
%
e
)
raise
e
return
True
code/lambdas/mlops-op-process-request/index.py
0 → 100644
View file @
ec3766ed
import
boto3
import
io
import
zipfile
import
json
from
datetime
import
datetime
s3
=
boto3
.
client
(
's3'
)
codepipeline
=
boto3
.
client
(
'codepipeline'
)
def
lambda_handler
(
event
,
context
):
trainingJob
=
None
deployment
=
None
try
:
now
=
datetime
.
now
()
jobId
=
event
[
"CodePipeline.job"
][
"id"
]
user_params
=
json
.
loads
(
event
[
"CodePipeline.job"
][
"data"
][
"actionConfiguration"
][
"configuration"
][
"UserParameters"
])
model_prefix
=
user_params
[
'model_prefix'
]
mlops_operation_template
=
s3
.
get_object
(
Bucket
=
user_params
[
'bucket'
],
Key
=
user_params
[
'prefix'
]
)[
'Body'
]
.
read
()
job_name
=
'mlops-
%
s-
%
s'
%
(
model_prefix
,
now
.
strftime
(
"
%
Y-
%
m-
%
d-
%
H-
%
M-
%
S"
))
s3Location
=
None
for
inputArtifacts
in
event
[
"CodePipeline.job"
][
"data"
][
"inputArtifacts"
]:
if
inputArtifacts
[
'name'
]
==
'ModelSourceOutput'
:
s3Location
=
inputArtifacts
[
'location'
][
's3Location'
]
params
=
{
"Parameters"
:
{
"AssetsBucket"
:
s3Location
[
'bucketName'
],
"AssetsKey"
:
s3Location
[
'objectKey'
],
"Operation"
:
"training"
,
"Environment"
:
"none"
,
"JobName"
:
job_name
}
}
for
outputArtifacts
in
event
[
"CodePipeline.job"
][
"data"
][
"outputArtifacts"
]:
if
outputArtifacts
[
'name'
]
==
'RequestOutput'
:
s3Location
=
outputArtifacts
[
'location'
][
's3Location'
]
zip_bytes
=
io
.
BytesIO
()
with
zipfile
.
ZipFile
(
zip_bytes
,
"w"
)
as
z
:
z
.
writestr
(
'assets/params_train.json'
,
json
.
dumps
(
params
))
params
[
'Parameters'
][
'Operation'
]
=
'deployment'
params
[
'Parameters'
][
'Environment'
]
=
'development'
z
.
writestr
(
'assets/params_deploy_dev.json'
,
json
.
dumps
(
params
))
params
[
'Parameters'
][
'Environment'
]
=
'production'
z
.
writestr
(
'assets/params_deploy_prd.json'
,
json
.
dumps
(
params
))
z
.
writestr
(
'assets/mlops_operation_handler.yml'
,
mlops_operation_template
)
zip_bytes
.
seek
(
0
)
s3
.
put_object
(
Bucket
=
s3Location
[
'bucketName'
],
Key
=
s3Location
[
'objectKey'
],
Body
=
zip_bytes
.
read
())
# and update codepipeline
codepipeline
.
put_job_success_result
(
jobId
=
jobId
)
except
Exception
as
e
:
resp
=
codepipeline
.
put_job_failure_result
(
jobId
=
jobId
,
failureDetails
=
{
'type'
:
'ConfigurationError'
,
'message'
:
str
(
e
),
'externalExecutionId'
:
context
.
aws_request_id
}
)
code/lambdas/mlops-op-training/index.py
0 → 100644
View file @
ec3766ed
import
boto3
import
io
import
zipfile
import
json
import
logging
from
crhelper
import
CfnResource
logger
=
logging
.
getLogger
(
__name__
)
# Initialise the helper, all inputs are optional, this example shows the defaults
helper
=
CfnResource
(
json_logging
=
False
,
log_level
=
'DEBUG'
,
boto_level
=
'CRITICAL'
)
s3
=
boto3
.
client
(
's3'
)
sm
=
boto3
.
client
(
'sagemaker'
)
def
lambda_handler
(
event
,
context
):
helper
(
event
,
context
)
@
helper
.
create
@
helper
.
update
def
start_training_job
(
event
,
context
):
try
:
# Get the training job and deployment descriptors
training_params
=
None
deployment_params
=
None
job_name
=
event
[
'ResourceProperties'
][
'JobName'
]
helper
.
Data
.
update
({
'job_name'
:
job_name
})
try
:
# We need to check if there is another training job with the same name
sm
.
describe_training_job
(
TrainingJobName
=
job_name
)
## there is, let's let the poll to address this
except
Exception
as
a
:
# Ok. there isn't. so, let's start a new training job
resp
=
s3
.
get_object
(
Bucket
=
event
[
'ResourceProperties'
][
'AssetsBucket'
],
Key
=
event
[
'ResourceProperties'
][
'AssetsKey'
])
with
zipfile
.
ZipFile
(
io
.
BytesIO
(
resp
[
'Body'
]
.
read
()),
"r"
)
as
z
:
training_params
=
json
.
loads
(
z
.
read
(
'trainingjob.json'
)
.
decode
(
'ascii'
))
deployment_params
=
json
.
loads
(
z
.
read
(
'deployment.json'
)
.
decode
(
'ascii'
))
training_params
[
'TrainingJobName'
]
=
job_name
resp
=
sm
.
create_training_job
(
**
training_params
)
except
Exception
as
e
:
logger
.
error
(
"start_training_job - Ops! Something went wrong:
%
s"
%
e
)
raise
e
@
helper
.
delete
def
stop_training_job
(
event
,
context
):
try
:
job_name
=
event
[
'ResourceProperties'
][
'JobName'
]
status
=
sm
.
describe_training_job
(
TrainingJobName
=
job_name
)[
'TrainingJobStatus'
]
if
status
==
'InProgress'
:
logger
.
info
(
'Stopping InProgress training job:
%
s'
,
job_name
)
sm
.
stop_training_job
(
TrainingJobName
=
job_name
)
return
False
else
:
logger
.
info
(
'Training job status:
%
s, nothing to stop'
,
status
)
except
Exception
as
e
:
logger
.
error
(
"stop_training_job - Ops! Something went wrong:
%
s"
%
e
)
return
True
@
helper
.
poll_create
@
helper
.
poll_update
def
check_training_job_progress
(
event
,
context
):
failed
=
False
try
:
job_name
=
helper
.
Data
.
get
(
'job_name'
)
resp
=
sm
.
describe_training_job
(
TrainingJobName
=
job_name
)
status
=
resp
[
'TrainingJobStatus'
]
if
status
==
'Completed'
:
logger
.
info
(
'Training Job (
%
s) is Completed'
,
job_name
)
return
True
elif
status
in
[
'InProgress'
,
'Stopping'
]:
logger
.
info
(
'Training job (
%
s) still in progress (
%
s), waiting and polling again...'
,
job_name
,
resp
[
'SecondaryStatus'
])
elif
status
==
'Failed'
:
failed
=
True
raise
Exception
(
'Training job has failed: {}'
,
format
(
resp
[
'FailureReason'
]))
else
:
raise
Exception
(
'Training job ({}) has unexpected status: {}'
.
format
(
job_name
,
status
))
except
Exception
as
e
:
logger
.
error
(
"check_training_job_progress - Ops! Something went wrong:
%
s"
%
e
)
if
failed
:
raise
e
return
False
@
helper
.
poll_delete
def
check_stopping_training_job_progress
(
event
,
context
):
logger
.
info
(
"check_stopping_training_job_progress"
)
return
stop_training_job
(
event
,
context
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment