forked from Sys-KU/DeepPlan
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkload.cpp
More file actions
126 lines (95 loc) · 3.3 KB
/
workload.cpp
File metadata and controls
126 lines (95 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include <client/workload.h>
Workload::Workload(int concurrency, int rate,
int n_requests, std::string addr, std::string port)
: concurrency(concurrency),
rate(rate),
n_requests(n_requests),
_traces(n_requests),
addr(addr),
port(port) {
std::minstd_rand gen(0);
std::uniform_int_distribution<> udist(0, concurrency-1);
std::exponential_distribution<double> edist(rate);
for (auto& trace : _traces) {
trace.first = edist(gen);
trace.second = udist(gen);
}
};
Workload::Workload(std::vector<unsigned>& rates,
std::string addr, std::string port)
: _traces(0),
addr(addr),
port(port) {
std::minstd_rand gen(0);
int cnt = 0;
for (int i = 0; i < rates.size(); i++) {
double itv = 0;
std::exponential_distribution<double> edist(rates[i]/60.0);
cnt += rates[i];
itv = edist(gen);
while (itv < 60) {
_traces.push_back({itv, i});
itv += edist(gen);
}
}
sort(_traces.begin(), _traces.end(),
[](auto& a, auto& b) { return a.first < b.first;});
for (int i = _traces.size()-1; i > 0; i--) {
_traces[i].first -= _traces[i-1].first;
}
n_requests = _traces.size();
};
void Workload::run(std::vector<std::vector<char>>& inputs) {
client.connect(addr, port);
for (auto& trace : _traces) {
double interval = trace.first;
int model_id = trace.second;
usleep(interval*1e6);
uint64_t t_send = util::now();
auto onSuccess = [this, t_send](serverapi::Response* rsp) {
auto response = dynamic_cast<serverapi::InferenceResponse*>(rsp);
uint64_t t_receive = util::now();
uint64_t latency = (t_receive-t_send) / 1e6;
this->latencies.push_back(latency);
if (response->is_cold) this->cold_start_cnt++;
};
client.infer_async(inputs[model_id], model_id, onSuccess);
}
client.shutdown();
}
WorkloadResult Workload::result(int slo) {
WorkloadResult result;
std::sort(latencies.begin(), latencies.end());
int index_99 = latencies.size() * 0.99 - 1;
int goodput_cnt = 0;
for (auto& latency : latencies)
if (latency <= slo) goodput_cnt++;
result.latency_99 = latencies[index_99];
result.cold_rate = (double)cold_start_cnt / n_requests * 100;
result.goodput_rate = (double)goodput_cnt / n_requests * 100;
return result;
}
ModelLoader::ModelLoader(std::vector<std::string> model_names,
int n_models, EngineType engine_type,
int mp_size, std::string addr, std::string port)
: model_names(model_names),
n_models(n_models),
engine_type(engine_type),
mp_size(mp_size),
addr(addr),
port(port) {};
void ModelLoader::run() {
client.connect(addr, port);
util::InputGenerator input_generator;
inputs.resize(n_models);
int n_models_per_type = n_models / model_names.size();
for (int i = 0; i < n_models; i++) {
input_generator.generate_input(model_names[i/n_models_per_type], 1, &inputs[i]);
}
client.upload_model(model_names, n_models, engine_type, mp_size);
for (int i = 0; i < n_models; i++) {
auto onSuccess = [this](serverapi::Response* rsp) {};
client.infer_async(inputs[i], i, onSuccess);
}
client.shutdown();
}