forked from marcoszh/MArk-Project
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery_processor.py
More file actions
129 lines (107 loc) · 4.94 KB
/
query_processor.py
File metadata and controls
129 lines (107 loc) · 4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import asyncio
import json
import logging
import threading
import time
from collections import deque
import requests
import aiohttp
import tensorflow as tf
from . import aws_manager, utils
from .model_source import mdl_source
from .load_balancer import get_balancer
from .data_accessor import instance_accessor, demand_aws_accessor
from .constants import *
from .instance_source import ins_source
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s'
)
REQ_CPU = 0
REQ_GPU = 1
REQ_LAMBDA_CPU = 2
REQ_LAMBDA_GPU = 3
REQ_FAIL_CPU = 4
REQ_FAIL_GPU = 5
class QueryProcessor():
def set_loop(self, loop_):
self.loop = loop_
self.query_queue = QueryQuene()
self.balancer = get_balancer()
# self.instances = utils.parse_instances(instance_accessor.get_all_instances())
# instance_accessor.subscribe(self.update_instances)
self.session = aiohttp.ClientSession(loop=self.loop)
self.loop.create_task(self._manage_queue())
async def send_query(self, name, time, data):
future = asyncio.Future()
await self.query_queue.put(future, name, time, data)
await future
return future.result()
async def _manage_queue(self):
while True:
"""
batch requests for p2 instances, batch size = 16
"""
info = await self.query_queue.get()
name = info[0][1]
fu, times, data = [i[0] for i in info], [i[2] for i in info], [i[3] for i in info]
alloc_info = ins_source.get_ins_alloc(name, self.balancer)
if alloc_info:
ip, typ = alloc_info[0], alloc_info[1]
if typ.startswith('p2'):
other_info = await self.query_queue.get(HANDLE_SIZE_P2 - 1)
[ (fu.append(i[0]), times.append(i[2]), data.append(i[3])) for i in other_info ]
elif typ.startswith('c5.x'):
other_info = await self.query_queue.get(HANDLE_SIZE_C5X - 1)
[ (fu.append(i[0]), times.append(i[2]), data.append(i[3])) for i in other_info ]
elif typ.startswith('c5.2x'):
other_info = await self.query_queue.get(HANDLE_SIZE_C52X - 1)
[ (fu.append(i[0]), times.append(i[2]), data.append(i[3])) for i in other_info ]
elif typ.startswith('c5.4x'):
other_info = await self.query_queue.get(HANDLE_SIZE_C54X - 1)
[ (fu.append(i[0]), times.append(i[2]), data.append(i[3])) for i in other_info ]
elif typ.startswith('c5.'):
other_info = await self.query_queue.get(HANDLE_SIZE_C5 - 1)
[ (fu.append(i[0]), times.append(i[2]), data.append(i[3])) for i in other_info ]
self.loop.create_task(self._get_result(fu, name, times, data, ip))
else:
[ f.set_result(('No resources available', -1, utils.gap_time(t))) for f, t in zip(fu, times) ]
async def _get_result(self, futures, name, times, data, ip):
results, req_type = await self._serve(name, data, ip)
[ f.set_result((r, typ, utils.gap_time(t))) for f, t, r, typ in zip(futures, times, results, req_type) ]
async def _serve(self, name, data, ip):
is_gpu = len(data) > 2
req_type = [REQ_GPU for _ in data] if is_gpu else [REQ_CPU for _ in data]
logging.info(f'Send request to ip: {ip}; batch_size:{len(data)}')
async with self.session.post(**mdl_source.get_request(data, ip)) as resp:
if resp.status == 200:
r = await resp.json()
return (mdl_source.collect_result(r), req_type)
else:
logging.info(f'Request rejected. ip: {ip}; status: {resp.status}')
return ([ r for _ in data ], req_typ)
async with self.session.get(mdl_source.get_lambda_req()) as res_lam:
if res_lam.status == 200:
r = await res_lam.text()
req_typ = [REQ_LAMBDA_GPU for _ in data] if is_gpu else [REQ_LAMBDA_CPU for _ in data]
return ([ r for _ in data ], req_typ)
else:
logging.info(f'Lambda rejected. status: {res_lam.status}')
req_typ = [REQ_FAIL_GPU for _ in data] if is_gpu else [REQ_FAIL_CPU for _ in data]
return ([ f'Error code : {res_lam.status}' for _ in data ], req_typ)
class QueryQuene():
def __init__(self):
self.queue = asyncio.Queue()
async def put(self, fu, name, time, data):
await self.queue.put((fu, name, time, data))
async def get(self, num=1):
items = []
while num > 0:
item = await self.queue.get()
items.append(item)
num -= 1
return items
def empty(self):
return self.queue.empty()
def size(self):
return self.queue.qsize()