forked from aws/sagemaker-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtimeout.py
More file actions
199 lines (173 loc) · 7.39 KB
/
timeout.py
File metadata and controls
199 lines (173 loc) · 7.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from __future__ import absolute_import
from contextlib import contextmanager
import logging
from time import sleep
from awslogs.core import AWSLogs
from botocore.exceptions import ClientError
import stopit
from sagemaker import Predictor
from tests.integ.retry import retries
LOGGER = logging.getLogger("timeout")
@contextmanager
def timeout(seconds=0, minutes=0, hours=0):
"""
Add a signal-based timeout to any block of code.
If multiple time units are specified, they will be added together to determine time limit.
Usage:
with timeout(seconds=5):
my_slow_function(...)
Args:
- seconds: The time limit, in seconds.
- minutes: The time limit, in minutes.
- hours: The time limit, in hours.
"""
limit = seconds + 60 * minutes + 3600 * hours
with stopit.ThreadingTimeout(limit, swallow_exc=False) as t:
yield [t]
@contextmanager
def timeout_and_delete_endpoint_by_name(
endpoint_name,
sagemaker_session,
seconds=0,
minutes=45,
hours=0,
sleep_between_cleanup_attempts=10,
exponential_sleep=False,
):
limit = seconds + 60 * minutes + 3600 * hours
with stopit.ThreadingTimeout(limit, swallow_exc=False) as t:
no_errors = False
try:
yield [t]
no_errors = True
finally:
attempts = 3
while attempts > 0:
attempts -= 1
try:
_delete_schedules_associated_with_endpoint(
sagemaker_session=sagemaker_session, endpoint_name=endpoint_name
)
sagemaker_session.delete_endpoint(endpoint_name)
LOGGER.info("deleted endpoint {}".format(endpoint_name))
_show_logs(endpoint_name, "Endpoints", sagemaker_session)
if no_errors:
_cleanup_logs(endpoint_name, "Endpoints", sagemaker_session)
break
except ClientError as ce:
if ce.response["Error"]["Code"] == "ValidationException":
# avoids the inner exception to be overwritten
pass
# trying to delete the resource again in 10 seconds
if exponential_sleep:
_sleep_between_cleanup_attempts = sleep_between_cleanup_attempts * (
3 - attempts
)
else:
_sleep_between_cleanup_attempts = sleep_between_cleanup_attempts
sleep(_sleep_between_cleanup_attempts)
@contextmanager
def timeout_and_delete_model_with_transformer(
transformer, sagemaker_session, seconds=0, minutes=0, hours=0, sleep_between_cleanup_attempts=10
):
limit = seconds + 60 * minutes + 3600 * hours
with stopit.ThreadingTimeout(limit, swallow_exc=False) as t:
no_errors = False
try:
yield [t]
no_errors = True
finally:
attempts = 3
while attempts > 0:
attempts -= 1
try:
transformer.delete_model()
LOGGER.info("deleted SageMaker model {}".format(transformer.model_name))
_show_logs(transformer.model_name, "Models", sagemaker_session)
if no_errors:
_cleanup_logs(transformer.model_name, "Models", sagemaker_session)
break
except ClientError as ce:
if ce.response["Error"]["Code"] == "ValidationException":
pass
sleep(sleep_between_cleanup_attempts)
def _delete_schedules_associated_with_endpoint(sagemaker_session, endpoint_name):
"""Deletes schedules associated with a given endpoint. Per latest validation, ensures the
schedule is stopped and no executions are running, before deleting (otherwise latest
server-side validations will prevent deletes).
Args:
sagemaker_session (sagemaker.session.Session): A SageMaker Session
object, used for SageMaker interactions (default: None). If not
specified, one is created using the default AWS configuration
chain.
endpoint_name (str): The name of the endpoint to delete schedules from.
"""
predictor = Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
monitors = predictor.list_monitors()
for monitor in monitors:
try:
monitor._wait_for_schedule_changes_to_apply()
# Stop the schedules to prevent new executions from triggering.
monitor.stop_monitoring_schedule()
executions = monitor.list_executions()
for execution in executions:
execution.stop()
# Wait for all executions to completely stop.
# Schedules can't be deleted with running executions.
for execution in executions:
for _ in retries(60, "Waiting for executions to stop", seconds_to_sleep=5):
status = execution.describe()["ProcessingJobStatus"]
if status == "Stopped":
break
# Delete schedules.
monitor.delete_monitoring_schedule()
except Exception as e:
LOGGER.warning(
"Failed to delete monitor {},\nError: {}".format(
monitor.monitoring_schedule_name, e
)
)
def _show_logs(resource_name, resource_type, sagemaker_session):
log_group = "/aws/sagemaker/{}/{}".format(resource_type, resource_name)
try:
# print out logs before deletion for debuggability
LOGGER.info("cloudwatch logs for log group {}:".format(log_group))
logs = AWSLogs(
log_group_name=log_group,
log_stream_name="ALL",
start="1d",
aws_region=sagemaker_session.boto_session.region_name,
)
logs.list_logs()
except Exception:
LOGGER.exception(
"Failure occurred while listing cloudwatch log group %s. Swallowing exception but printing "
"stacktrace for debugging.",
log_group,
)
def _cleanup_logs(resource_name, resource_type, sagemaker_session):
log_group = "/aws/sagemaker/{}/{}".format(resource_type, resource_name)
try:
# print out logs before deletion for debuggability
LOGGER.info("deleting cloudwatch log group {}:".format(log_group))
cwl_client = sagemaker_session.boto_session.client("logs")
cwl_client.delete_log_group(logGroupName=log_group)
LOGGER.info("deleted cloudwatch log group: {}".format(log_group))
except Exception:
LOGGER.exception(
"Failure occurred while cleaning up cloudwatch log group %s. "
"Swallowing exception but printing stacktrace for debugging.",
log_group,
)