forked from marcoszh/MArk-Project
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinstance_source.py
More file actions
152 lines (115 loc) · 6.65 KB
/
instance_source.py
File metadata and controls
152 lines (115 loc) · 6.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import asyncio
import logging
from . import aws_manager, prize_request, utils
from .constants import *
from .data_accessor import (aws_accessor, demand_aws_accessor,
instance_accessor, backup_ins_accessor, pre_aws_accessor,
pre_demand_aws_accessor)
CHECK_PERIOD = 120
class _InstanceSource():
def set_loop(self, loop_):
self.loop = loop_
self.run_loop()
def run_loop(self):
return
def launch_backup(self, name, tag):
return
def stop_backup(self, name):
return
def get_ins_alloc(self, name, balancer):
pass
def get_current_ins_and_prize(self, name, index_type):
pass
def launch_ins(self, name, params):
pass
def kill_ins(self, name, region, typ, num):
pass
def kill_all_ins(self, name):
pass
def initial_ins(self, name, tag):
pass
class OnDemandSource(_InstanceSource):
def get_ins_alloc(self, name, balancer):
ins_json = demand_aws_accessor.get_cluster(name)
if ins_json:
intance_list = [ utils.dict2Instance(i) for i in ins_json['info'].values() ]
return balancer.next_ip(name, intance_list)
def get_current_ins_and_prize(self, name, index_type):
aws_info = demand_aws_accessor.get_cluster(name)
intance_list = []
if aws_info:
intance_list = [utils.dict2Instance(i) for i in aws_info['info'].values()]
currentInstance = []
[ currentInstance.append(len([ i for i in intance_list if i.typ == typ and i.region == DEFAULT_REGION])) for typ in index_type ]
info = pre_demand_aws_accessor.get_cluster(name)
if info and info['info']:
c_tmp = []
[ c_tmp.append(sum([ i['num'] for _, i in info['info'].items() if i['type'] == typ and i['region'] == DEFAULT_REGION])) for typ in index_type ]
currentInstance = [ (e1 + e2) for e1, e2 in zip(currentInstance, c_tmp)]
prize_list = prize_request.get_demand_prize_by_region_type(DEFAULT_REGION, index_type)
return currentInstance, prize_list
def launch_ins(self, name, params):
aws_manager.launch_on_demand_instances.delay(name, params)
def kill_ins(self, name, region, typ, num):
aws_manager.kill_on_demand_instances.delay(name, region, typ, num)
def kill_all_ins(self, name):
aws_manager.kill_all_on_demand_ins(name, DEFAULT_REGION)
def initial_ins(self, name, tag):
aws_manager.launch_on_demand_instances(name, {'imageId':AMIS[DEFAULT_REGION]['CPU'], 'instanceType':'c5.xlarge', 'targetCapacity':6, 'key_value':[('exp_round', tag)] })
# aws_manager.launch_on_demand_instances(name, {'imageId':AMIS[DEFAULT_REGION]['CPU'], 'instanceType':'c5.large', 'targetCapacity':10, 'key_value':[('exp_round', tag)] })
# aws_manager.launch_on_demand_instances(name, {'imageId':AMIS[DEFAULT_REGION]['GPU'], 'instanceType':'p2.xlarge', 'targetCapacity':4, 'key_value':[('exp_round', tag)] })
class SpotSource(_InstanceSource):
def run_loop(self):
self.loop.create_task(self._moniter())
# detect the state of spot instances
async def _moniter(self):
while True:
await asyncio.sleep(CHECK_PERIOD)
logging.info('Check states of spot instances')
aws_manager.check_spot_states.delay()
def get_ins_alloc(self, name, balancer):
ins_json = instance_accessor.get_instances(name)
backup_ins = backup_ins_accessor.get_instances(name)
if ins_json:
intance_list = [ utils.dict2Instance(i) for i in ins_json ]
if backup_ins:
backup_instance_list = [ utils.Instance(i.ip, 'c5.large', i.region) for i in [ utils.dict2Instance(i) for i in backup_ins ]]
intance_list += backup_instance_list
return balancer.next_ip(name, intance_list)
def get_current_ins_and_prize(self, name, index_type):
# filter the instances by type and region(Default region: us-east-1)
intance_list = [ utils.dict2Instance(i) for i in instance_accessor.get_instances(name) ]
currentInstance = []
[ currentInstance.append(len([ i for i in intance_list if i.typ == typ and i.region == DEFAULT_REGION])) for typ in index_type ]
info = pre_aws_accessor.get_cluster(name)
if info['info']:
c_tmp = []
[ c_tmp.append(sum([ i['num'] for _, i in info['info'].items() if i['type'] == typ and i['region'] == DEFAULT_REGION])) for typ in index_type ]
currentInstance = [ (e1 + e2) for e1, e2 in zip(currentInstance, c_tmp)]
prize_list = prize_request.get_spot_prize_by_region_type(DEFAULT_REGION, index_type)
return currentInstance, prize_list
def launch_ins(self, name, params):
aws_manager.launch_spot_instances.delay(name, params)
def kill_ins(self, name, region, typ, num):
aws_manager.kill_spot_instances_by_num.delay(name, region, typ, num)
def kill_all_ins(self, name):
aws_manager.cancel_all_instances(name)
def launch_backup(self, name, tag):
# launch on demand ins for fault tolerance
aws_manager.launch_on_demand_instances(name, {'imageId':AMIS[DEFAULT_REGION]['CPU'], 'instanceType':'t2.medium', 'targetCapacity':10, 'key_value':[('exp_round', tag)] })
aws_manager.stop_on_demand_instances(name)
# aws_manager.start_on_demand_instances(name)
def stop_backup(self, name):
aws_manager.stop_on_demand_instances(name)
def initial_ins(self, name, tag):
# aws_manager.launch_spot_instances(name, {'imageId':AMIS[DEFAULT_REGION]['CPU'], 'instanceType':'c5.large', 'targetCapacity':1, 'key_value':[('exp_round', tag)] })
# aws_manager.launch_spot_instances(name, {'imageId':AMIS[DEFAULT_REGION]['CPU'], 'instanceType':'c5.xlarge', 'targetCapacity':10, 'key_value':[('exp_round', tag)] })
aws_manager.launch_spot_instances(name, {'imageId':AMIS[DEFAULT_REGION]['CPU'], 'instanceType':'c5.large', 'targetCapacity':10, 'key_value':[('exp_round', tag)] })
aws_manager.launch_spot_instances(name, {'imageId':AMIS[DEFAULT_REGION]['GPU'], 'instanceType':'p2.xlarge', 'targetCapacity':1, 'key_value':[('exp_round', tag)] })
# aws_manager.launch_spot_instances(name, {'imageId':AMIS[DEFAULT_REGION]['CPU'], 'instanceType':'c5.large', 'targetCapacity':8, 'key_value':[('exp_round', tag)] })
# aws_manager.launch_spot_instances(name, {'imageId':AMIS[DEFAULT_REGION]['GPU'], 'instanceType':'p2.xlarge', 'targetCapacity':1, 'key_value':[('exp_round', tag)] })
all_ins_sources = {
'spot': SpotSource(),
'ondemand': OnDemandSource()
}
ins_source = all_ins_sources[INS_SOURCE]