-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAlgorithmSupportServer.old.py
More file actions
381 lines (363 loc) · 22.5 KB
/
AlgorithmSupportServer.old.py
File metadata and controls
381 lines (363 loc) · 22.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# ! /usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import grpc
import redis # 单节点 Redis 包
import rediscluster as rediscl
import torch
import pickle
import numpy as np
import pandas as pd
from concurrent import futures
from proto import AlgorithmSupport_pb2, AlgorithmSupport_pb2_grpc
from utils import redis_utils, preprocess
from algorithm import pred, lstmx2, tcn, darnn
from algorithm.decompose import cpu, regtree, hardware
from algorithm.rf import rf_test
from algorithm.metric.qos_brb.evaluate import main_opt_bayes
from algorithm.metric.dc_level.evaluate import sample_dc_levels
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
SEQ_LEN = 10
# Redis 相关参数
REDIS_ADDR = os.getenv("REDIS_IP")
REDIS_PORT = os.getenv("REDIS_PORT")
REDIS_PWD = os.getenv("REDIS_PASSWORD")
REDIS_DB = os.getenv("REDIS_DATABASE")
redis_nodes = [{'host':'10.160.109.63','port':30001},
{'host':'10.160.109.63','port':30002},
{'host':'10.160.109.63','port':30003},
{'host':'10.160.109.63','port':30004},
{'host':'10.160.109.63','port':30005},
{'host':'10.160.109.63','port':30006}]
# 不同类型虚拟机配置,用于归一化
VM_CONF = {'1': {"cpu.usage.percent": 1, "memory.used.percent": 1}, '2': {"cpu.usage.percent": 2, "memory.used.percent": 2},
'3': {"cpu.usage.percent": 4, "memory.used.percent": 4}, '4': {"cpu.usage.percent": 8, "memory.used.percent": 8}}
# 虚拟机特征名
VM_COLS = ["cpu.usage.percent", "memory.used.percent", "disk.vda.disk_octets.write",
"disk.vda.disk_octets.read", "interface.eth0.if_octets.tx", "interface.eth0.if_octets.rx"]
# pod 特征名
POD_COLS = ["cpu.percent", "memory.percent", "blkio.io_service_bytes_recursive-253-0.write",
"blkio.io_service_bytes_recursive-253-0.read", "network.usage.tx_bytes", "network.usage.rx_bytes"]
SERVER_COLS = ["cpu.active.percent", "memory.used.percent", "disk.sda.disk_octets.write",
"disk.sda.disk_octets.read", "interface.eth0.if_octets.tx", "interface.eth0.if_octets.rx",
"pdu.power"]
SERVER_LABEL = "pdu.power"
PODP_COLS = ["cpu.percent", "memory.percent", "blkio.io_service_bytes_recursive-253-0.write",
"blkio.io_service_bytes_recursive-253-0.read", "network.usage.tx_bytes", "network.usage.rx_bytes",
"power"]
POD_LABEL = "power"
# 虚拟机和 pod 对应指标名
VM_CPU_METRIC = "cpu.usage.percent"
VM_MEM_METRIC = "memory.used.percent"
POD_CPU_METRIC = "cpu.percent"
POD_MEM_METRIC = "memory.percent"
TYPE_COL_NAME = "type"
# 物理机配置
CPU_COUNT = 4 # 4核
MEM_CAP = 8 # 8G
# 虚拟机配置
VM_CONF = {'1': {"cpu.usage.percent": 1, "memory.used.percent": 1}, '2': {"cpu.usage.percent": 2, "memory.used.percent": 2},
'3': {"cpu.usage.percent": 4, "memory.used.percent": 4}, '4': {"cpu.usage.percent": 8, "memory.used.percent": 8}}
# 整体归一化范围
MINMAX_RANGE = [[0, 0, 0, 0, 0, 0], [100 * CPU_COUNT, 100 * MEM_CAP, 1e9, 1e9, 1e9, 1e9]]
MINMAXP_RANGE = [[0, 0, 0, 0, 0, 0, 0], [100, 100, 1e9, 1e9, 1e9, 1e9, 75]] # 功率得从0开始归一化,否则在做容器能耗预测的时候可能会有空值
# 模型名称和对应路径
MODEL_PATH = {"regtree": ("pickle", "model/decompose/regtree-printWb.pkl"), "xgboost": ("pickle", "model/decompose/xgboost.pkl"),
"direct": ("pickle", "model/rf-pod.pkl"), "rf": ("pickle", "model/rf.pkl"), "lstmx2": ("torch", "model/lstmx2.pkl"),
"tcn": ("torch", "model/tcn.pkl"), "darnn": ("torch", "model/darnn.pkl")}
MODEL_DICT = {}
# QoS 指标评估数据读取
QOS_BRB_PARH = "algorithm/metric/qos_brb/compute01.xlsx"
QOS_BRB_DATA = pd.read_excel(QOS_BRB_PARH, header=None)
class AlgorithmSupportService(AlgorithmSupport_pb2_grpc.AlgorithmSupportServiceServicer):
# @profile
def AlgorithmSupport(self, request, context):
# # 取出预加载模型
# model = MODEL_DICT[request.algorithm]
#
# # Redis 连接配置
# redis_pool = redis.ConnectionPool(host=REDIS_ADDR, port=REDIS_PORT, password=REDIS_PWD, db=REDIS_DB)
# # 建立连接
# r = redis.Redis(connection_pool=redis_pool)
try:
r = rediscl.RedisCluster(startup_nodes=redis_nodes, password='lidata429')
except Exception as e:
print("redis Connection Error!", e)
if request.serviceType == "pod_e" or request.serviceType == "vm" or request.serviceType == "hardware":
model = MODEL_DICT[request.algorithm]
# 如果是直接利用 pod 状态进行 pod 能耗评估的方法
if request.serviceType == "pod_e" and request.algorithm == "direct":
# 直接获取该 pod 的相关状态信息
redis_req_list = redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, request.entityID)
pod_df = preprocess.req2df(redis_req_list, r, request.startTimestamp, request.endTimestamp)
ts_list = pod_df.index.values
pod_df = preprocess.fillna_decompose(pod_df, methods="interpolate")
pod_df = preprocess.fillna_decompose(pod_df, methods="fb")
pod_df = preprocess.fillna_decompose(pod_df)
# print(pod_df.columns)
pod_cols = list(map(lambda x: "docker." + request.entityID + "." + x, POD_COLS))
print(pod_cols)
pod_power_list = rf_test(pod_df.loc[:, pod_cols].values, model=MODEL_DICT["direct"])
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='%s' % (dict(zip(ts_list, pod_power_list))))
# 如果是 pod 能耗分解,需要先找到容器所在的物理机
if request.serviceType == "pod_e":
# 获取 pod 对应的物理机 id
print("{team2}.hostOf" + request.entityID)
server_id = redis_utils.one2one_get(r, "{team2}.hostOf" + request.entityID)
print(server_id)
# # 获取 pod 对应的 vm id
# print("{team2}.hostOf" + request.entityID)
# vm_id_podin = redis_utils.one2one_get(r, "{team2}.hostOf" + request.entityID)
# print(vm_id_podin)
# # 获取 vm 对应的 物理机 id
# server_id = redis_utils.one2one_get(r, "{team2}.hostOf" + vm_id_podin)
# print(server_id)
else: # 否则的话,传入的 host 即为物理机名
server_id = request.entityID
# print("VM or Hardware")
# 根据时间戳判断在哪个时间段内去计算有关的物理机请求有哪些
redis_req_list = redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, server_id)
print(redis_req_list)
# 请求虚拟机目标时间戳下的 dict
vm_list = preprocess.req_vmlist(redis_req_list, r, request.startTimestamp, request.endTimestamp)
print(vm_list)
vm_df_dict = dict()
# 得到每个虚拟机的 dataframe 数据
for vm_name in vm_list:
vm_df_dict[vm_name] = preprocess.req2df(redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, vm_name),
r, request.startTimestamp, request.endTimestamp)
# 先填充一下虚拟机类型(有该列时删除)
vm_df_dict[vm_name][TYPE_COL_NAME] = '2'
# 显示 dataframe 的所有列
pd.set_option('display.max_columns', None)
print(vm_df_dict)
# 获取当前时刻的能耗值
server_df = preprocess.req2df(redis_req_list, r, request.startTimestamp, request.endTimestamp)
pdu_df = server_df.loc[:, [SERVER_LABEL]]
# 如果是请求的是虚拟机分解且采用的是CPU使用率能耗分解模型
if request.serviceType == "vm" and request.algorithm == "cpu":
# 根据 CPU 使用率分解到功率
vm_power_dict = cpu.decompose(vm_df_dict, VM_CPU_METRIC, pdu_df)
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='%s' % (vm_power_dict))
# 其他情况,需要计算虚拟机类型对应的特征和
vm_type_sum_df = preprocess.vm_type_sum(vm_df_dict.values(), VM_CONF, multi_gb_names=["index", "type"])
# vm_sum_df = preprocess.vm_type_sum(vm_df_dict.values(), VM_CONF)
# 根据虚拟机类型去做特征值的放大
vm_type_sum_df = preprocess.vm_type_enlarge(vm_type_sum_df, VM_CONF, cpu_metric=None, mem_metric=VM_MEM_METRIC,
minmax_columns=VM_COLS, minmax_range=MINMAX_RANGE)
# 单虚拟机值放大+归一化
for vm_name in vm_list:
vm_df_dict[vm_name] = preprocess.vm_type_enlarge(vm_df_dict[vm_name].reset_index().set_index(TYPE_COL_NAME),
VM_CONF, cpu_metric=None, mem_metric=VM_MEM_METRIC,
minmax_columns=VM_COLS, minmax_range=MINMAX_RANGE)
# 按虚拟机类型合并为单个 dataframe(将 type 列扩充到特征上)
vm_train_df, vm_cpu_sum_df = preprocess.vm_type_concat(vm_type_sum_df, "index", VM_CONF.keys(),
request.startTimestamp, request.endTimestamp,
resort_columns=["index"] + VM_COLS,
cpu_column=None)
# cpu_column=VM_CPU_METRIC)
# 如果是虚拟机能耗分解
if request.serviceType == "vm":
if request.algorithm == "regtree":
multi = True if request.startTimestamp != request.endTimestamp else False
# # 加载模型文件(已采用预加载模型)
# file = open("model/decompose/regtree-printWb.pkl", 'rb')
# model = pickle.load(file)
# file.close()
# 从列中取出虚拟机类型
vm_types = list(map(lambda x: x.reset_index()[TYPE_COL_NAME].values[0], vm_df_dict.values()))
# 对虚拟机列名重排序后转为 ndarray
vm_np_list = list(map(lambda x: x.loc[:, VM_COLS].values, vm_df_dict.values()))
# 取得每个虚拟机 CPU 使用比例
vm_cpu_usage_dict = cpu.decompose(vm_df_dict, VM_CPU_METRIC, pdu_df, only_cpu=True)
# 所有虚拟机的CPU使用率之和
cpu_total = sum(vm_cpu_usage_dict.values())
# 计算CPU使用率的比例
vm_cpu_ratio_list = list(map(lambda x: x / cpu_total, vm_cpu_usage_dict.values()))
# 计算得到各个虚拟机的能耗
vm_power_list = regtree.predict_wrapper(model, vm_train_df.values, vm_np_list, vm_types,
vm_cpu_ratio_list,
len(VM_COLS), multi=multi)
# 将功率数据存放到 dict 中
vm_power_dict = dict(zip(vm_df_dict.keys(), vm_power_list))
# 将物理机能耗也保存到 dict 中
vm_power_dict[request.entityID] = np.squeeze(pdu_df.values).tolist() if multi else pdu_df.values[0].tolist()
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result="%s" % (vm_power_dict))
# pod 能耗分解
elif request.serviceType == "pod_e":
if request.algorithm == "regtree":
# # 加载模型文件
# with open("model/decompose/regtree-printWb.pkl", 'rb') as file:
# model = pickle.load(file)
# 从 redis 中获取 pod 相关数据
pod_df = preprocess.req2df(redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, request.entityID),
r, request.startTimestamp, request.endTimestamp)
# 取出所有时间戳
ts_list = pod_df.index.values
# print(pod_df)
# 预处理,容器特征名去容器名化
pod_df = preprocess.rmname_from_cols(pod_df, "docker." + request.entityID + '.')
# print(pod_df)
# 预处理,缺失值处理
pod_df = preprocess.fillna_decompose(pod_df, methods="interpolate")
pod_df = preprocess.fillna_decompose(pod_df, methods="fb")
pod_df = preprocess.fillna_decompose(pod_df, methods="zero")
# print(pod_df)
# 根据该容器所在虚拟机去确定虚拟机类型,由于此处固定为2,因此写死
pod_df[TYPE_COL_NAME] = '2'
# # 预处理,值放大+归一化
pod_df = preprocess.vm_type_enlarge(pod_df.set_index(TYPE_COL_NAME), VM_CONF, cpu_metric=None,
mem_metric=POD_MEM_METRIC,
minmax_columns=POD_COLS, minmax_range=MINMAX_RANGE)
print(pod_df)
# 取得每个虚拟机 CPU 使用比例
vm_cpu_usage_dict = cpu.decompose(vm_df_dict, VM_CPU_METRIC, pdu_df, only_cpu=True)
# 所有虚拟机的CPU使用率之和
cpu_total = sum(vm_cpu_usage_dict.values())
# print(cpu_total)
# 计算该 pod CPU 使用率占总CPU使用率的比率
pod_cpu_ratios = pod_df[POD_CPU_METRIC].values / cpu_total
# print(pod_cpu_ratios)
# 将 pod 的列名按照虚拟机模型的顺序排序后取出
pod_np = pod_df.loc[:, POD_COLS].values
# 采用能耗分解模型进行评估
pod_power_list = regtree.predict_wrapper(model, vm_train_df.values, pod_np, pod_df.index.values,
pod_cpu_ratios,
len(POD_COLS), multi=True)
# print(pod_power_list)
# 将结果用 dict 封装起来
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='%s' % (dict(zip(ts_list, pod_power_list))))
# 吉万鹏硬件分解模块
elif request.serviceType == "hardware":
# # 以 iloc 去取的第一行数据
# timestampi = 0
# 多点分解逻辑
power_dict = {"other_power": [], "CPU_power": [], "Disk_power": [], "Memory_power": [], "Net_power": []}
for timestampi in range(vm_train_df.shape[0]):
# 取得原始数据模型的预测值
pre = hardware.xgboost_test(vm_train_df.iloc[timestampi: timestampi + 1], model)
# 通过修改赋值的方法去做分解
hardware.hardware_power(vm_train_df, pre, timestampi, feature_columns=vm_train_df.columns,
power_dict=power_dict, model=model)
# 加上物理机能耗数据
print(pdu_df.shape)
power_dict["host"] = np.squeeze(pdu_df.values).tolist() if pdu_df.shape[0] > 1 else [
np.squeeze(pdu_df.values).tolist()]
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='%s' % (power_dict))
# 能耗预测
if request.serviceType == "server" or request.serviceType == "pod_p":
try:
# 根据预测的是物理机还是虚拟机设置变量
if request.serviceType == "server":
MINMAX_COLS = SERVER_COLS
LABEL = SERVER_LABEL
else:
MINMAX_COLS = PODP_COLS
LABEL = POD_LABEL
TYPE = '2' # 写死虚拟机类型为2
# 根据请求的start和end以及数据库的数据存储间隔进行请求的划分
# # 服务器相关的请求
# redis_req_list, pdu_req_list = redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, request.entityID)
# print(redis_req_list, pdu_req_list)
# # PDU 相关的请求
# pdu_req_list = redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, "pdu-mini")
# 【新】服务器相关的请求
redis_req_list = redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, request.entityID)
print(redis_req_list)
# # 请求数据并转datafrme
# server_df = preprocess.req2df(redis_req_list, r, request.startTimestamp, request.endTimestamp)
# pdu_df = preprocess.req2df(pdu_req_list, r, request.startTimestamp, request.endTimestamp)
# server_df = server_df.merge(pdu_df, left_index=True, right_index=True)
# 【新】请求数据并转datafrme
server_df = preprocess.req2df(redis_req_list, r, request.startTimestamp, request.endTimestamp)
pdu_df = server_df.loc[:, [LABEL]]
print("Metric df shape:", server_df.shape, "Power df shape:", pdu_df.shape)
except Exception as e:
print("Redis Related Error: %s" % (e))
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='')
try:
# 缺失值填充
preprocess.fillna_decompose(server_df, methods="interpolate")
# 防止 pod 数据开头与末尾缺失,填充
if request.serviceType == "pod_p":
preprocess.fillna_decompose(server_df, methods="fb")
# 异常值处理
preprocess.outlier_decompose(server_df)
# 归一化
if request.serviceType == "server":
minmax_np = preprocess.minmax(server_df, MINMAX_COLS, MINMAXP_RANGE)
else:
minmax_range = MINMAXP_RANGE * np.array([CPU_COUNT / VM_CONF[TYPE]["cpu.usage.percent"],
MEM_CAP / VM_CONF[TYPE]["memory.used.percent"], 1, 1, 1, 1,
1])
# 预处理,容器特征名去容器名化
server_df = preprocess.rmname_from_cols(server_df, "docker." + request.entityID + '.')
minmax_np = preprocess.minmax(server_df, MINMAX_COLS, minmax_range)
print("Metric shape after minmax:", minmax_np.shape)
minmax_np, y_np = preprocess.pred_concat(minmax_np, pdu_df.values, time_len=SEQ_LEN)
print("Metric shape after concat:", minmax_np.shape)
# print(y_np.shape)
except Exception as e:
print("Preprocessing Related Error: %s" % (e))
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='')
try:
print(request.algorithm)
time_start = time.time()
if request.algorithm == "RF" or request.algorithm == "rf":
y_pred = pred.rf_test(minmax_np, model=MODEL_DICT["rf"])
if request.algorithm == "ARIMA" or request.algorithm == "arima":
y_pred = pred.arima_forecast(pdu_df.values)
elif request.algorithm == "LSTMx2" or request.algorithm == "lstmx2":
y_pred = lstmx2.predict(minmax_np, model=MODEL_DICT["lstmx2"])
elif request.algorithm == "TCN" or request.algorithm == "tcn":
y_pred = tcn.predict(minmax_np, model=MODEL_DICT["tcn"])
elif request.algorithm == "DARNN" or request.algorithm == "darnn":
y_pred = darnn.predict(minmax_np, model=MODEL_DICT["darnn"])
print("算法运行时长:", time.time() - time_start)
# pred.rf_train(minmax_np, pdu_df.values[:minmax_np.shape[0]])
# y_pred = pred.rf_test(minmax_np)
# return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='The power of %s is %s !' % (request.entityID, y_pred[0]))
except Exception as e:
print("Algorithm Related Error: %s" % (e))
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='')
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='%s,%s' % (pdu_df.values[-1][0], y_pred))
# 如果为 qos 指标评估
if request.serviceType == "qos":
# 且算法为 brb 算法
if request.algorithm == "brb":
ret = main_opt_bayes(QOS_BRB_DATA.iloc[int(request.startTimestamp) % 300:int(request.endTimestamp) % 300, :],
int(request.startTimestamp) % 300, int(request.endTimestamp) % 300)
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result="%s" % {"qos": ret})
# 如果为数据中心指标评估
if request.serviceType == "dc":
# 如果为最大隶属度算法
if request.algorithm == "membership":
ret = sample_dc_levels(int(request.startTimestamp), int(request.endTimestamp))
print(len(ret))
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result="%s" % {"dc": ret})
return AlgorithmSupport_pb2.AlgorithmSupportResponse(result="")
# @profile
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=20))
AlgorithmSupport_pb2_grpc.add_AlgorithmSupportServiceServicer_to_server(AlgorithmSupportService(), server)
print(REDIS_ADDR, REDIS_PORT, REDIS_DB, REDIS_PWD)
for model in MODEL_PATH.keys():
path = MODEL_PATH[model][1]
print(model, path)
if MODEL_PATH[model][0] == "pickle":
with open(path, "rb") as fa:
MODEL_DICT[model] = pickle.load(fa)
else:
MODEL_DICT[model] = torch.load(path)
# QOS_BRB_DATA = pd.read_excel(QOS_BRB_PARH, header=None)
server.add_insecure_port('[::]:50050')
# server.wait_for_termination(timeout=20)
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()