# ! /usr/bin/env python # -*- coding: utf-8 -*- import os import time import grpc import redis # åèç¹ Redis å import rediscluster as rediscl import torch import pickle import numpy as np import pandas as pd from concurrent import futures from proto import AlgorithmSupport_pb2, AlgorithmSupport_pb2_grpc from utils import redis_utils, preprocess from algorithm import pred, lstmx2, tcn, darnn from algorithm.decompose import cpu, regtree, hardware from algorithm.rf import rf_test from algorithm.metric.qos_brb.evaluate import main_opt_bayes from algorithm.metric.dc_level.evaluate import sample_dc_levels _ONE_DAY_IN_SECONDS = 60 * 60 * 24 SEQ_LEN = 10 # Redis ç¸å ³åæ° REDIS_ADDR = os.getenv("REDIS_IP") REDIS_PORT = os.getenv("REDIS_PORT") REDIS_PWD = os.getenv("REDIS_PASSWORD") REDIS_DB = os.getenv("REDIS_DATABASE") redis_nodes = [{'host':'10.160.109.63','port':30001}, {'host':'10.160.109.63','port':30002}, {'host':'10.160.109.63','port':30003}, {'host':'10.160.109.63','port':30004}, {'host':'10.160.109.63','port':30005}, {'host':'10.160.109.63','port':30006}] # ä¸åç±»åèææºé ç½®ï¼ç¨äºå½ä¸å VM_CONF = {'1': {"cpu.usage.percent": 1, "memory.used.percent": 1}, '2': {"cpu.usage.percent": 2, "memory.used.percent": 2}, '3': {"cpu.usage.percent": 4, "memory.used.percent": 4}, '4': {"cpu.usage.percent": 8, "memory.used.percent": 8}} # èææºç¹å¾å VM_COLS = ["cpu.usage.percent", "memory.used.percent", "disk.vda.disk_octets.write", "disk.vda.disk_octets.read", "interface.eth0.if_octets.tx", "interface.eth0.if_octets.rx"] # pod ç¹å¾å POD_COLS = ["cpu.percent", "memory.percent", "blkio.io_service_bytes_recursive-253-0.write", "blkio.io_service_bytes_recursive-253-0.read", "network.usage.tx_bytes", "network.usage.rx_bytes"] SERVER_COLS = ["cpu.active.percent", "memory.used.percent", "disk.sda.disk_octets.write", "disk.sda.disk_octets.read", "interface.eth0.if_octets.tx", "interface.eth0.if_octets.rx", "pdu.power"] SERVER_LABEL = "pdu.power" PODP_COLS = ["cpu.percent", "memory.percent", "blkio.io_service_bytes_recursive-253-0.write", "blkio.io_service_bytes_recursive-253-0.read", "network.usage.tx_bytes", "network.usage.rx_bytes", "power"] POD_LABEL = "power" # èææºå pod å¯¹åºææ å VM_CPU_METRIC = "cpu.usage.percent" VM_MEM_METRIC = "memory.used.percent" POD_CPU_METRIC = "cpu.percent" POD_MEM_METRIC = "memory.percent" TYPE_COL_NAME = "type" # ç©çæºé ç½® CPU_COUNT = 4 # 4æ ¸ MEM_CAP = 8 # 8G # èææºé ç½® VM_CONF = {'1': {"cpu.usage.percent": 1, "memory.used.percent": 1}, '2': {"cpu.usage.percent": 2, "memory.used.percent": 2}, '3': {"cpu.usage.percent": 4, "memory.used.percent": 4}, '4': {"cpu.usage.percent": 8, "memory.used.percent": 8}} # æ´ä½å½ä¸åèå´ MINMAX_RANGE = [[0, 0, 0, 0, 0, 0], [100 * CPU_COUNT, 100 * MEM_CAP, 1e9, 1e9, 1e9, 1e9]] MINMAXP_RANGE = [[0, 0, 0, 0, 0, 0, 0], [100, 100, 1e9, 1e9, 1e9, 1e9, 75]] # åçå¾ä»0å¼å§å½ä¸åï¼å¦åå¨å容å¨è½è颿µçæ¶åå¯è½ä¼æç©ºå¼ # 模ååç§°å对åºè·¯å¾ MODEL_PATH = {"regtree": ("pickle", "model/decompose/regtree-printWb.pkl"), "xgboost": ("pickle", "model/decompose/xgboost.pkl"), "direct": ("pickle", "model/rf-pod.pkl"), "rf": ("pickle", "model/rf.pkl"), "lstmx2": ("torch", "model/lstmx2.pkl"), "tcn": ("torch", "model/tcn.pkl"), "darnn": ("torch", "model/darnn.pkl")} MODEL_DICT = {} # QoS ææ è¯ä¼°æ°æ®è¯»å QOS_BRB_PARH = "algorithm/metric/qos_brb/compute01.xlsx" QOS_BRB_DATA = pd.read_excel(QOS_BRB_PARH, header=None) class AlgorithmSupportService(AlgorithmSupport_pb2_grpc.AlgorithmSupportServiceServicer): # @profile def AlgorithmSupport(self, request, context): # # ååºé¢å 载模å # model = MODEL_DICT[request.algorithm] # # # Redis è¿æ¥é ç½® # redis_pool = redis.ConnectionPool(host=REDIS_ADDR, port=REDIS_PORT, password=REDIS_PWD, db=REDIS_DB) # # 建ç«è¿æ¥ # r = redis.Redis(connection_pool=redis_pool) try: r = rediscl.RedisCluster(startup_nodes=redis_nodes, password='lidata429') except Exception as e: print("redis Connection Error!", e) if request.serviceType == "pod_e" or request.serviceType == "vm" or request.serviceType == "hardware": model = MODEL_DICT[request.algorithm] # 妿æ¯ç´æ¥å©ç¨ pod ç¶æè¿è¡ pod è½èè¯ä¼°çæ¹æ³ if request.serviceType == "pod_e" and request.algorithm == "direct": # ç´æ¥è·å该 pod çç¸å ³ç¶æä¿¡æ¯ redis_req_list = redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, request.entityID) pod_df = preprocess.req2df(redis_req_list, r, request.startTimestamp, request.endTimestamp) ts_list = pod_df.index.values pod_df = preprocess.fillna_decompose(pod_df, methods="interpolate") pod_df = preprocess.fillna_decompose(pod_df, methods="fb") pod_df = preprocess.fillna_decompose(pod_df) # print(pod_df.columns) pod_cols = list(map(lambda x: "docker." + request.entityID + "." + x, POD_COLS)) print(pod_cols) pod_power_list = rf_test(pod_df.loc[:, pod_cols].values, model=MODEL_DICT["direct"]) return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='%s' % (dict(zip(ts_list, pod_power_list)))) # å¦ææ¯ pod è½èåè§£ï¼éè¦å æ¾å°å®¹å¨æå¨çç©çæº if request.serviceType == "pod_e": # è·å pod 对åºçç©çæº id print("{team2}.hostOf" + request.entityID) server_id = redis_utils.one2one_get(r, "{team2}.hostOf" + request.entityID) print(server_id) # # è·å pod 对åºç vm id # print("{team2}.hostOf" + request.entityID) # vm_id_podin = redis_utils.one2one_get(r, "{team2}.hostOf" + request.entityID) # print(vm_id_podin) # # è·å vm 对åºç ç©çæº id # server_id = redis_utils.one2one_get(r, "{team2}.hostOf" + vm_id_podin) # print(server_id) else: # å¦åçè¯ï¼ä¼ å ¥ç host å³ä¸ºç©çæºå server_id = request.entityID # print("VM or Hardware") # æ ¹æ®æ¶é´æ³å¤æå¨åªä¸ªæ¶é´æ®µå å»è®¡ç®æå ³çç©çæºè¯·æ±æåªäº redis_req_list = redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, server_id) print(redis_req_list) # 请æ±èææºç®æ æ¶é´æ³ä¸ç dict vm_list = preprocess.req_vmlist(redis_req_list, r, request.startTimestamp, request.endTimestamp) print(vm_list) vm_df_dict = dict() # å¾å°æ¯ä¸ªèææºç dataframe æ°æ® for vm_name in vm_list: vm_df_dict[vm_name] = preprocess.req2df(redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, vm_name), r, request.startTimestamp, request.endTimestamp) # å å¡«å ä¸ä¸èææºç±»åï¼æè¯¥åæ¶å é¤ï¼ vm_df_dict[vm_name][TYPE_COL_NAME] = '2' # æ¾ç¤º dataframe çææå pd.set_option('display.max_columns', None) print(vm_df_dict) # è·åå½åæ¶å»çè½èå¼ server_df = preprocess.req2df(redis_req_list, r, request.startTimestamp, request.endTimestamp) pdu_df = server_df.loc[:, [SERVER_LABEL]] # 妿æ¯è¯·æ±çæ¯èææºåè§£ä¸éç¨çæ¯CPU使ç¨çè½èå解模å if request.serviceType == "vm" and request.algorithm == "cpu": # æ ¹æ® CPU 使ç¨çåè§£å°åç vm_power_dict = cpu.decompose(vm_df_dict, VM_CPU_METRIC, pdu_df) return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='%s' % (vm_power_dict)) # å ¶ä»æ åµï¼éè¦è®¡ç®èææºç±»å对åºçç¹å¾å vm_type_sum_df = preprocess.vm_type_sum(vm_df_dict.values(), VM_CONF, multi_gb_names=["index", "type"]) # vm_sum_df = preprocess.vm_type_sum(vm_df_dict.values(), VM_CONF) # æ ¹æ®èææºç±»åå»åç¹å¾å¼çæ¾å¤§ vm_type_sum_df = preprocess.vm_type_enlarge(vm_type_sum_df, VM_CONF, cpu_metric=None, mem_metric=VM_MEM_METRIC, minmax_columns=VM_COLS, minmax_range=MINMAX_RANGE) # åèææºå¼æ¾å¤§+å½ä¸å for vm_name in vm_list: vm_df_dict[vm_name] = preprocess.vm_type_enlarge(vm_df_dict[vm_name].reset_index().set_index(TYPE_COL_NAME), VM_CONF, cpu_metric=None, mem_metric=VM_MEM_METRIC, minmax_columns=VM_COLS, minmax_range=MINMAX_RANGE) # æèææºç±»åå并为å个 dataframe(å° type åæ©å å°ç¹å¾ä¸) vm_train_df, vm_cpu_sum_df = preprocess.vm_type_concat(vm_type_sum_df, "index", VM_CONF.keys(), request.startTimestamp, request.endTimestamp, resort_columns=["index"] + VM_COLS, cpu_column=None) # cpu_column=VM_CPU_METRIC) # 妿æ¯èææºè½èåè§£ if request.serviceType == "vm": if request.algorithm == "regtree": multi = True if request.startTimestamp != request.endTimestamp else False # # å 载模åæä»¶ï¼å·²éç¨é¢å 载模åï¼ # file = open("model/decompose/regtree-printWb.pkl", 'rb') # model = pickle.load(file) # file.close() # ä»åä¸ååºèææºç±»å vm_types = list(map(lambda x: x.reset_index()[TYPE_COL_NAME].values[0], vm_df_dict.values())) # å¯¹èææºååéæåºå转为 ndarray vm_np_list = list(map(lambda x: x.loc[:, VM_COLS].values, vm_df_dict.values())) # å徿¯ä¸ªèææº CPU ä½¿ç¨æ¯ä¾ vm_cpu_usage_dict = cpu.decompose(vm_df_dict, VM_CPU_METRIC, pdu_df, only_cpu=True) # ææèææºçCPU使ç¨çä¹å cpu_total = sum(vm_cpu_usage_dict.values()) # 计ç®CPU使ç¨ççæ¯ä¾ vm_cpu_ratio_list = list(map(lambda x: x / cpu_total, vm_cpu_usage_dict.values())) # 计ç®å¾å°åä¸ªèææºçè½è vm_power_list = regtree.predict_wrapper(model, vm_train_df.values, vm_np_list, vm_types, vm_cpu_ratio_list, len(VM_COLS), multi=multi) # å°åçæ°æ®åæ¾å° dict ä¸ vm_power_dict = dict(zip(vm_df_dict.keys(), vm_power_list)) # å°ç©çæºè½èä¹ä¿åå° dict ä¸ vm_power_dict[request.entityID] = np.squeeze(pdu_df.values).tolist() if multi else pdu_df.values[0].tolist() return AlgorithmSupport_pb2.AlgorithmSupportResponse(result="%s" % (vm_power_dict)) # pod è½èåè§£ elif request.serviceType == "pod_e": if request.algorithm == "regtree": # # å 载模åæä»¶ # with open("model/decompose/regtree-printWb.pkl", 'rb') as file: # model = pickle.load(file) # ä» redis ä¸è·å pod ç¸å ³æ°æ® pod_df = preprocess.req2df(redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, request.entityID), r, request.startTimestamp, request.endTimestamp) # ååºæææ¶é´æ³ ts_list = pod_df.index.values # print(pod_df) # é¢å¤çï¼å®¹å¨ç¹å¾åå»å®¹å¨åå pod_df = preprocess.rmname_from_cols(pod_df, "docker." + request.entityID + '.') # print(pod_df) # é¢å¤çï¼ç¼ºå¤±å¼å¤ç pod_df = preprocess.fillna_decompose(pod_df, methods="interpolate") pod_df = preprocess.fillna_decompose(pod_df, methods="fb") pod_df = preprocess.fillna_decompose(pod_df, methods="zero") # print(pod_df) # æ ¹æ®è¯¥å®¹å¨æå¨èææºå»ç¡®å®èææºç±»åï¼ç±äºæ¤å¤åºå®ä¸º2ï¼å æ¤åæ» pod_df[TYPE_COL_NAME] = '2' # # é¢å¤çï¼å¼æ¾å¤§+å½ä¸å pod_df = preprocess.vm_type_enlarge(pod_df.set_index(TYPE_COL_NAME), VM_CONF, cpu_metric=None, mem_metric=POD_MEM_METRIC, minmax_columns=POD_COLS, minmax_range=MINMAX_RANGE) print(pod_df) # å徿¯ä¸ªèææº CPU ä½¿ç¨æ¯ä¾ vm_cpu_usage_dict = cpu.decompose(vm_df_dict, VM_CPU_METRIC, pdu_df, only_cpu=True) # ææèææºçCPU使ç¨çä¹å cpu_total = sum(vm_cpu_usage_dict.values()) # print(cpu_total) # 计ç®è¯¥ pod CPU 使ç¨çå æ»CPU使ç¨ççæ¯ç pod_cpu_ratios = pod_df[POD_CPU_METRIC].values / cpu_total # print(pod_cpu_ratios) # å° pod çååæç §èææºæ¨¡åçé¡ºåºæåºåååº pod_np = pod_df.loc[:, POD_COLS].values # éç¨è½èå解模åè¿è¡è¯ä¼° pod_power_list = regtree.predict_wrapper(model, vm_train_df.values, pod_np, pod_df.index.values, pod_cpu_ratios, len(POD_COLS), multi=True) # print(pod_power_list) # å°ç»æç¨ dict å°è£ èµ·æ¥ return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='%s' % (dict(zip(ts_list, pod_power_list)))) # åä¸é¹ç¡¬ä»¶å解模å elif request.serviceType == "hardware": # # 以 iloc å»åç第ä¸è¡æ°æ® # timestampi = 0 # å¤ç¹åè§£é»è¾ power_dict = {"other_power": [], "CPU_power": [], "Disk_power": [], "Memory_power": [], "Net_power": []} for timestampi in range(vm_train_df.shape[0]): # åå¾åå§æ°æ®æ¨¡åç颿µå¼ pre = hardware.xgboost_test(vm_train_df.iloc[timestampi: timestampi + 1], model) # éè¿ä¿®æ¹èµå¼çæ¹æ³å»ååè§£ hardware.hardware_power(vm_train_df, pre, timestampi, feature_columns=vm_train_df.columns, power_dict=power_dict, model=model) # å ä¸ç©çæºè½èæ°æ® print(pdu_df.shape) power_dict["host"] = np.squeeze(pdu_df.values).tolist() if pdu_df.shape[0] > 1 else [ np.squeeze(pdu_df.values).tolist()] return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='%s' % (power_dict)) # è½è颿µ if request.serviceType == "server" or request.serviceType == "pod_p": try: # æ ¹æ®é¢æµçæ¯ç©çæºè¿æ¯èææºè®¾ç½®åé if request.serviceType == "server": MINMAX_COLS = SERVER_COLS LABEL = SERVER_LABEL else: MINMAX_COLS = PODP_COLS LABEL = POD_LABEL TYPE = '2' # åæ»èææºç±»å为2 # æ ¹æ®è¯·æ±çstartåend以忰æ®åºçæ°æ®åå¨é´éè¿è¡è¯·æ±çåå # # æå¡å¨ç¸å ³çè¯·æ± # redis_req_list, pdu_req_list = redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, request.entityID) # print(redis_req_list, pdu_req_list) # # PDU ç¸å ³çè¯·æ± # pdu_req_list = redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, "pdu-mini") # ãæ°ãæå¡å¨ç¸å ³çè¯·æ± redis_req_list = redis_utils.redis_ts_split(request.startTimestamp, request.endTimestamp, request.entityID) print(redis_req_list) # # è¯·æ±æ°æ®å¹¶è½¬datafrme # server_df = preprocess.req2df(redis_req_list, r, request.startTimestamp, request.endTimestamp) # pdu_df = preprocess.req2df(pdu_req_list, r, request.startTimestamp, request.endTimestamp) # server_df = server_df.merge(pdu_df, left_index=True, right_index=True) # ãæ°ãè¯·æ±æ°æ®å¹¶è½¬datafrme server_df = preprocess.req2df(redis_req_list, r, request.startTimestamp, request.endTimestamp) pdu_df = server_df.loc[:, [LABEL]] print("Metric df shape:", server_df.shape, "Power df shape:", pdu_df.shape) except Exception as e: print("Redis Related Error: %s" % (e)) return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='') try: # 缺失å¼å¡«å preprocess.fillna_decompose(server_df, methods="interpolate") # 鲿¢ pod æ°æ®å¼å¤´ä¸æ«å°¾ç¼ºå¤±ï¼å¡«å if request.serviceType == "pod_p": preprocess.fillna_decompose(server_df, methods="fb") # å¼å¸¸å¼å¤ç preprocess.outlier_decompose(server_df) # å½ä¸å if request.serviceType == "server": minmax_np = preprocess.minmax(server_df, MINMAX_COLS, MINMAXP_RANGE) else: minmax_range = MINMAXP_RANGE * np.array([CPU_COUNT / VM_CONF[TYPE]["cpu.usage.percent"], MEM_CAP / VM_CONF[TYPE]["memory.used.percent"], 1, 1, 1, 1, 1]) # é¢å¤çï¼å®¹å¨ç¹å¾åå»å®¹å¨åå server_df = preprocess.rmname_from_cols(server_df, "docker." + request.entityID + '.') minmax_np = preprocess.minmax(server_df, MINMAX_COLS, minmax_range) print("Metric shape after minmax:", minmax_np.shape) minmax_np, y_np = preprocess.pred_concat(minmax_np, pdu_df.values, time_len=SEQ_LEN) print("Metric shape after concat:", minmax_np.shape) # print(y_np.shape) except Exception as e: print("Preprocessing Related Error: %s" % (e)) return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='') try: print(request.algorithm) time_start = time.time() if request.algorithm == "RF" or request.algorithm == "rf": y_pred = pred.rf_test(minmax_np, model=MODEL_DICT["rf"]) if request.algorithm == "ARIMA" or request.algorithm == "arima": y_pred = pred.arima_forecast(pdu_df.values) elif request.algorithm == "LSTMx2" or request.algorithm == "lstmx2": y_pred = lstmx2.predict(minmax_np, model=MODEL_DICT["lstmx2"]) elif request.algorithm == "TCN" or request.algorithm == "tcn": y_pred = tcn.predict(minmax_np, model=MODEL_DICT["tcn"]) elif request.algorithm == "DARNN" or request.algorithm == "darnn": y_pred = darnn.predict(minmax_np, model=MODEL_DICT["darnn"]) print("ç®æ³è¿è¡æ¶é¿:", time.time() - time_start) # pred.rf_train(minmax_np, pdu_df.values[:minmax_np.shape[0]]) # y_pred = pred.rf_test(minmax_np) # return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='The power of %s is %s !' % (request.entityID, y_pred[0])) except Exception as e: print("Algorithm Related Error: %s" % (e)) return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='') return AlgorithmSupport_pb2.AlgorithmSupportResponse(result='%s,%s' % (pdu_df.values[-1][0], y_pred)) # å¦æä¸º qos ææ è¯ä¼° if request.serviceType == "qos": # ä¸ç®æ³ä¸º brb ç®æ³ if request.algorithm == "brb": ret = main_opt_bayes(QOS_BRB_DATA.iloc[int(request.startTimestamp) % 300:int(request.endTimestamp) % 300, :], int(request.startTimestamp) % 300, int(request.endTimestamp) % 300) return AlgorithmSupport_pb2.AlgorithmSupportResponse(result="%s" % {"qos": ret}) # å¦æä¸ºæ°æ®ä¸å¿ææ è¯ä¼° if request.serviceType == "dc": # å¦æä¸ºæå¤§é¶å±åº¦ç®æ³ if request.algorithm == "membership": ret = sample_dc_levels(int(request.startTimestamp), int(request.endTimestamp)) print(len(ret)) return AlgorithmSupport_pb2.AlgorithmSupportResponse(result="%s" % {"dc": ret}) return AlgorithmSupport_pb2.AlgorithmSupportResponse(result="") # @profile def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=20)) AlgorithmSupport_pb2_grpc.add_AlgorithmSupportServiceServicer_to_server(AlgorithmSupportService(), server) print(REDIS_ADDR, REDIS_PORT, REDIS_DB, REDIS_PWD) for model in MODEL_PATH.keys(): path = MODEL_PATH[model][1] print(model, path) if MODEL_PATH[model][0] == "pickle": with open(path, "rb") as fa: MODEL_DICT[model] = pickle.load(fa) else: MODEL_DICT[model] = torch.load(path) # QOS_BRB_DATA = pd.read_excel(QOS_BRB_PARH, header=None) server.add_insecure_port('[::]:50050') # server.wait_for_termination(timeout=20) server.start() try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()