Files
A-Tune/modules/server/profile/profile.go
openeuler-ci-bot a574db9514 !574 A-Tune识别应用场景及动态推荐调优参数的实现-修改分类识别go客户端和服务端代码
From: @SWWBF 
Reviewed-by: @gaoruoshu 
Signed-off-by: @gaoruoshu
2023-10-21 02:21:15 +00:00

1629 lines
44 KiB
Go

/*
* Copyright (c) 2019 Huawei Technologies Co., Ltd.
* A-Tune is licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
* PURPOSE.
* See the Mulan PSL v2 for more details.
* Create: 2019-10-29
*/
package main
import (
"bufio"
"bytes"
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
HTTP "net/http"
"os"
"path"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/go-ini/ini"
"github.com/mitchellh/mapstructure"
"github.com/urfave/cli"
"google.golang.org/grpc"
PB "gitee.com/openeuler/A-Tune/api/profile"
_ "gitee.com/openeuler/A-Tune/common/checker"
"gitee.com/openeuler/A-Tune/common/config"
"gitee.com/openeuler/A-Tune/common/http"
"gitee.com/openeuler/A-Tune/common/log"
"gitee.com/openeuler/A-Tune/common/models"
"gitee.com/openeuler/A-Tune/common/profile"
"gitee.com/openeuler/A-Tune/common/registry"
"gitee.com/openeuler/A-Tune/common/schedule"
SVC "gitee.com/openeuler/A-Tune/common/service"
"gitee.com/openeuler/A-Tune/common/sqlstore"
"gitee.com/openeuler/A-Tune/common/tuning"
"gitee.com/openeuler/A-Tune/common/utils"
)
// Monitor : the body send to monitor service
type Monitor struct {
Module string `json:"module"`
Purpose string `json:"purpose"`
Field string `json:"field"`
}
// CollectorPost : the body send to collection service
type CollectorPost struct {
Monitors []Monitor `json:"monitors"`
SampleNum int `json:"sample_num"`
Pipe string `json:"pipe"`
File string `json:"file"`
DataType string `json:"data_type"`
}
// RespCollectorPost : the response of collection servie
type RespCollectorPost struct {
Path string `json:"path"`
Data map[string]interface{} `json:"data"`
}
// ClassifyPostBody : the body send to classify service
type ClassifyPostBody struct {
Data string `json:"data"`
ModelPath string `json:"modelpath,omitempty"`
Model string `json:"model,omitempty"`
}
// RespClassify : the response of classify model
type RespClassify struct {
Bottleneck int `json:"bottleneck_binary"`
ResourceLimit string `json:"resource_limit"`
WorkloadType string `json:"workload_type"`
Percentage float32 `json:"percentage"`
}
// ProfileServer : the type impletent the grpc server
type ProfileServer struct {
utils.MutexLock
ConfPath string
ScriptPath string
Raw *ini.File
}
func init() {
svc := SVC.ProfileService{
Name: "opt.profile",
Desc: "opt profile module",
NewInst: NewProfileServer,
}
if err := SVC.AddService(&svc); err != nil {
fmt.Printf("Failed to load service project : %s\n", err)
return
}
log.Info("load profile service successfully\n")
}
// NewProfileServer method new a instance of the grpc server
func NewProfileServer(ctx *cli.Context, opts ...interface{}) (interface{}, error) {
defaultConfigFile := path.Join(config.DefaultConfPath, "atuned.cnf")
exist, err := utils.PathExist(defaultConfigFile)
if err != nil {
return nil, err
}
if !exist {
return nil, fmt.Errorf("could not find default config file")
}
cfg, err := ini.Load(defaultConfigFile)
if err != nil {
return nil, fmt.Errorf("failed to parse %s, %v", defaultConfigFile, err)
}
return &ProfileServer{
Raw: cfg,
}, nil
}
// RegisterServer method register the grpc service
func (s *ProfileServer) RegisterServer(server *grpc.Server) error {
PB.RegisterProfileMgrServer(server, s)
return nil
}
// Healthy method, implement SvrService interface
func (s *ProfileServer) Healthy(opts ...interface{}) error {
return nil
}
// Post method send POST to start analysis the workload type
func (p *ClassifyPostBody) Post() (*RespClassify, error) {
url := config.GetURL(config.ClassificationURI)
response, err := http.Post(url, p)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != 200 {
return nil, fmt.Errorf("online learning failed")
}
resBody, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}
resPostIns := new(RespClassify)
err = json.Unmarshal(resBody, resPostIns)
if err != nil {
return nil, err
}
return resPostIns, nil
}
// Post method send POST to start collection data
func (c *CollectorPost) Post() (*RespCollectorPost, error) {
url := config.GetURL(config.CollectorURI)
response, err := http.Post(url, c)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != 200 {
return nil, fmt.Errorf("collect data failed")
}
resBody, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}
resPostIns := new(RespCollectorPost)
err = json.Unmarshal(resBody, resPostIns)
if err != nil {
return nil, err
}
return resPostIns, nil
}
// Post method send POST to start transfer file
func Post(serviceType, paramName, path string) (string, error) {
url := config.GetURL(config.TransferURI)
file, err := os.Open(path)
if err != nil {
return "", fmt.Errorf("Path Error")
}
defer file.Close()
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
part, err := writer.CreateFormFile(paramName, filepath.Base(path))
if err != nil {
return "", fmt.Errorf("writer error")
}
_, err = io.Copy(part, file)
if err != nil {
return "", fmt.Errorf("copy error")
}
extraParams := map[string]string{
"service": serviceType,
"savepath": "/etc/atuned/" + serviceType + "/" + filepath.Base(path),
}
for key, val := range extraParams {
_ = writer.WriteField(key, val)
}
err = writer.Close()
if err != nil {
return "", fmt.Errorf("writer close error")
}
request, err := HTTP.NewRequest("POST", url, body)
if err != nil {
return "", fmt.Errorf("newRequest failed")
}
request.Header.Set("Content-Type", writer.FormDataContentType())
client, err := http.NewhttpClient(url)
if err != nil {
return "", err
}
resp, err := client.Do(request)
if err != nil {
return "", fmt.Errorf("do request error: %v", err)
} else {
defer resp.Body.Close()
body := &bytes.Buffer{}
_, err := body.ReadFrom(resp.Body)
if err != nil {
return "", fmt.Errorf("body read form error")
}
res := body.String()
res = res[1 : len(res)-2]
return res, nil
}
}
// Profile method set the workload type to effective manual
func (s *ProfileServer) Profile(profileInfo *PB.ProfileInfo, stream PB.ProfileMgr_ProfileServer) error {
profileNamesStr := profileInfo.GetName()
profileNames := strings.Split(profileNamesStr, ",")
profile, errMsg := profile.Load(profileNames)
if errMsg != "" {
fmt.Println("Failed to load profile:", profileInfo.GetName())
return fmt.Errorf("load profile %s failed: %s", profileInfo.GetName(), errMsg)
}
ch := make(chan *PB.AckCheck)
ctx, cancel := context.WithCancel(context.Background())
defer close(ch)
defer cancel()
go func(ctx context.Context) {
for {
select {
case value := <-ch:
_ = stream.Send(value)
case <-ctx.Done():
return
}
}
}(ctx)
if err := profile.RollbackActive(ch); err != nil {
return err
}
time.Sleep(1 * time.Second)
return nil
}
// ListWorkload method list support workload
func (s *ProfileServer) ListWorkload(profileInfo *PB.ProfileInfo, stream PB.ProfileMgr_ListWorkloadServer) error {
log.Debug("Begin to inquire all workloads\n")
profileLogs, err := sqlstore.GetProfileLogs()
if err != nil {
return err
}
var activeName string
if len(profileLogs) > 0 {
activeName = profileLogs[0].ProfileID
}
if activeName != "" {
classProfile := &sqlstore.GetClass{Class: activeName}
err = sqlstore.GetClasses(classProfile)
if err != nil {
return fmt.Errorf("inquery workload type table failed %v", err)
}
if len(classProfile.Result) > 0 {
activeName = classProfile.Result[0].ProfileType
}
}
log.Debugf("active name is %s", activeName)
err = filepath.Walk(config.DefaultProfilePath, func(absPath string, info os.FileInfo, err error) error {
if info.Name() == "include" {
return filepath.SkipDir
}
if !info.IsDir() {
if !strings.HasSuffix(info.Name(), ".conf") {
return nil
}
absFilename := absPath[len(config.DefaultProfilePath)+1:]
filenameOnly := strings.TrimSuffix(strings.ReplaceAll(absFilename, "/", "-"),
path.Ext(info.Name()))
var active bool
if filenameOnly == activeName {
active = true
}
_ = stream.Send(&PB.ListMessage{
ProfileNames: filenameOnly,
Active: strconv.FormatBool(active)})
}
return nil
})
if err != nil {
return err
}
return nil
}
// CheckInitProfile method check the system init information
// like BIOS version, memory balanced...
func (s *ProfileServer) CheckInitProfile(profileInfo *PB.ProfileInfo,
stream PB.ProfileMgr_CheckInitProfileServer) error {
ch := make(chan *PB.AckCheck)
defer close(ch)
go func() {
for value := range ch {
_ = stream.Send(value)
}
}()
services := registry.GetCheckerServices()
for _, service := range services {
log.Infof("initializing checker service: %s", service.Name)
if err := service.Instance.Init(); err != nil {
return fmt.Errorf("service init failed: %v", err)
}
}
// running checker service
for _, srv := range services {
service := srv
checkerService, ok := service.Instance.(registry.CheckService)
if !ok {
continue
}
if registry.IsCheckDisabled(service.Instance) {
continue
}
err := checkerService.Check(ch)
if err != nil {
log.Errorf("service %s running failed, reason: %v", service.Name, err)
continue
}
}
return nil
}
func handleCsv(curPath string, storePath string) error {
if storePath == "" {
return nil
}
fs, err := os.Open(curPath)
if err != nil {
return fmt.Errorf("can not get file, err is %+v", err)
}
defer fs.Close()
r := csv.NewReader(fs)
content, err := r.ReadAll()
if err != nil {
return fmt.Errorf("can not get file, err is %+v", err)
}
tempFile, err := os.OpenFile(storePath, os.O_RDWR|os.O_APPEND, 0640)
if err != nil {
return fmt.Errorf("can not get file, err is %+v", err)
}
defer tempFile.Close()
w := csv.NewWriter(tempFile)
if err := w.WriteAll(content[1:]); err != nil {
return fmt.Errorf("can not write to file, err is %+v", err)
}
if err := os.Rename(storePath, curPath); err != nil {
return fmt.Errorf("can not rename the path, err is %+v", err)
}
return nil
}
var collectStatus = make(map[string]string)
// Analysis method analysis the system traffic load
func (s *ProfileServer) Analysis(message *PB.AnalysisMessage, stream PB.ProfileMgr_AnalysisServer) error {
id := message.GetId()
if message.GetFlag() == "end" {
collectStatus[id] = "end"
return nil
}
if message.GetFlag() == "start" {
collectStatus[id] = "run"
}
if !message.Characterization {
if !s.TryLock() {
return fmt.Errorf("dynamic optimizer search or analysis has been in running")
}
defer s.Unlock()
}
_ = stream.Send(&PB.AckCheck{Name: "1. Analysis system runtime information: CPU Memory IO and Network..."})
npipe, err := utils.CreateNamedPipe()
if err != nil {
return fmt.Errorf("create named pipe failed")
}
defer os.Remove(npipe)
subProcess := true
collectionId := -1
go func() {
for {
file, err := os.OpenFile(npipe, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
log.Errorf("failed to open pipe, err: %v", err)
return
}
reader := bufio.NewReader(file)
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := strings.TrimRight(scanner.Text(), "\n")
pairs := strings.Split(line, " ")
screen := ""
for _, ele := range pairs {
if len(strings.Split(ele, ":")) == 2 {
screen += strings.Split(ele, ":")[1] + " "
}
}
_ = stream.Send(&PB.AckCheck{Name: screen, Status: utils.INFO})
retId, err := models.InitTransfer("csv", "running", line, "", collectionId)
if err != nil {
_ = stream.Send(&PB.AckCheck{Name: err.Error()})
log.Errorf("collect system data error: transfer data %v", err)
return
}
collectionId = retId
}
if !subProcess {
_, err := models.InitTransfer("csv", "finished", "", "", collectionId)
if err != nil {
_ = stream.Send(&PB.AckCheck{Name: err.Error()})
log.Errorf("collect system data error: transfer data %v", err)
return
}
break
}
}
}()
var respCollectPost *RespCollectorPost
tempFile := ""
for {
respCollectPost, err = s.collection(npipe, message.GetTime())
if err != nil {
_ = stream.Send(&PB.AckCheck{Name: err.Error()})
log.Errorf("collection system data error: %v", err)
return err
}
err = handleCsv(respCollectPost.Path, tempFile)
if err != nil {
return err
}
if collectStatus[id] == "end" {
delete(collectStatus, id)
break
}
if message.GetFlag() == "" {
break
}
}
subProcess = false
bottleneck, workloadType, resourceLimit, err := s.classify(respCollectPost.Path, message.GetModel())
if err != nil {
_ = stream.Send(&PB.AckCheck{Name: err.Error()})
return err
}
_, err = models.InitTransfer("csv", "finished", "", workloadType, collectionId)
if err != nil {
_ = stream.Send(&PB.AckCheck{Name: err.Error()})
return err
}
//3. judge the workload type is exist in the database
classProfile := &sqlstore.GetClass{Class: workloadType}
if err = sqlstore.GetClasses(classProfile); err != nil {
log.Errorf("inquery workload type table failed %v", err)
return fmt.Errorf("inquery workload type table failed %v", err)
}
if len(classProfile.Result) == 0 {
log.Errorf("%s is not exist in the table", workloadType)
return fmt.Errorf("%s is not exist in the table", workloadType)
}
// the workload type is already actived
if classProfile.Result[0].Active {
log.Infof("analysis result %s is the same with current active workload type", workloadType)
return nil
}
//4. inquery the support app of the workload type
classApps := &sqlstore.GetClassApp{Class: workloadType}
err = sqlstore.GetClassApps(classApps)
if err != nil {
log.Errorf("inquery support app depend on class error: %v", err)
return err
}
if len(classApps.Result) == 0 {
return fmt.Errorf("class %s is not exist in the tables", workloadType)
}
apps := classApps.Result[0].Apps
_ = stream.Send(&PB.AckCheck{Name: fmt.Sprintf("\n 2. Current System Workload Characterization is %s", apps)})
logFile, err := utils.GetLogFilePath(config.DefaultTempPath)
if err != nil {
return fmt.Errorf("get log file path failed: %v", err)
}
file, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return fmt.Errorf("open file failed: %v", err)
}
defer file.Close()
_, err = io.WriteString(file, apps+"\n")
if err != nil {
log.Errorf("write workload type to log failed: %v", err)
}
log.Infof("workload %s support app: %s", workloadType, apps)
log.Infof("workload %s resource limit: %s, cluster result resource limit: %s",
workloadType, apps, resourceLimit)
var step int = 3
cpuExist := bottleneck&16 > 0
memExist := bottleneck&8 > 0
netQualityExist := bottleneck&4 > 0
netIOExist := bottleneck&2 > 0
diskIOExist := bottleneck&1 > 0
if message.Bottleneck {
if bottleneck == 0 {
_ = stream.Send(&PB.AckCheck{Name: fmt.Sprintf("\n %d. Current System does not detected the five bottlenecks of Computing, Memory, Network, Network I/O and disk I/O", step)})
log.Infof("Current System does not detected the five bottlenecks of Computing, Memory, Network, Network I/O and disk I/O")
} else {
bottleneckTypes := []string{"Computing", "Memory", "Network", "Network I/O", "disk I/O"}
bottlenecks := []string{}
boolValues := []bool{cpuExist, memExist, netQualityExist, netIOExist, diskIOExist}
for i, exist := range boolValues {
if exist {
bottlenecks = append(bottlenecks, bottleneckTypes[i])
}
}
bottlenecksStr := strings.Join(bottlenecks, ", ")
_ = stream.Send(&PB.AckCheck{Name: fmt.Sprintf("\n %d. Current System bottlenecks: %s", step, bottlenecksStr)})
log.Infof("Current System bottlenecks: %s", bottlenecksStr)
}
step++
}
if message.Characterization {
return nil
}
_ = stream.Send(&PB.AckCheck{Name: fmt.Sprintf("\n %d. Build the best resource model...", step)})
step++
//5. get the profile type depend on the workload type
profileType := classProfile.Result[0].ProfileType
profileNames := strings.Split(profileType, ",")
if len(profileNames) == 0 {
log.Errorf("No profile or invalid profiles were specified.")
return fmt.Errorf("no profile or invalid profiles were specified")
}
if message.Bottleneck {
bottleneck_profiles := []struct {
bottleneck bool
profile string
}{
{cpuExist, "include-compute-intensive"},
{memExist, "include-memory-intensive"},
{netQualityExist, "include-network-intensive"},
{netIOExist, "include-network-intensive"},
{diskIOExist, "include-io-intensive"},
}
for _, c := range bottleneck_profiles {
if c.bottleneck {
exists := false
for _, existingProfile := range profileNames {
if existingProfile == c.profile {
exists = true
break
}
}
if !exists {
profileNames = append(profileNames, c.profile)
}
}
}
}
//6. get the profile info depend on the profile type
log.Infof("the resource model of the profile type is %s", strings.Join(profileNames, ", "))
_ = stream.Send(&PB.AckCheck{Name: fmt.Sprintf("\n %d. Match profile: %s", step, strings.Join(profileNames, ", "))})
step++
pro, _ := profile.Load(profileNames)
pro.SetWorkloadType(workloadType)
pro.SetCollectionId(collectionId)
_ = stream.Send(&PB.AckCheck{Name: fmt.Sprintf("\n %d. begin to set static profile", step)})
step++
log.Infof("begin to set static profile")
//static profile setting
ch := make(chan *PB.AckCheck)
go func() {
for value := range ch {
_ = stream.Send(value)
}
}()
_ = pro.RollbackActive(ch)
logPath, err := utils.GetLogFilePath(config.DefaultTempPath)
if err != nil {
return fmt.Errorf("get log file path failed: %v", err)
}
defer os.Remove(logPath)
_, err = Post("classification", "file", logPath)
if err != nil {
return fmt.Errorf("Failed transfer file log file to server: %v", err)
}
rules := &sqlstore.GetRuleTuned{Class: workloadType}
if err := sqlstore.GetRuleTuneds(rules); err != nil {
return err
}
if len(rules.Result) < 1 {
_ = stream.Send(&PB.AckCheck{Name: "Completed optimization, please restart application!"})
log.Info("no rules to tuned")
return nil
}
log.Info("begin to dynamic tuning depending on rules")
_ = stream.Send(&PB.AckCheck{Name: fmt.Sprintf("\n %d. begin to set dynamic profile", step)})
step++
if err := tuning.RuleTuned(workloadType); err != nil {
return err
}
_ = stream.Send(&PB.AckCheck{Name: "Completed optimization, please restart application!"})
return nil
}
// Tuning method calling the bayes search method to tuned parameters
func (s *ProfileServer) Tuning(stream PB.ProfileMgr_TuningServer) error {
if !s.TryLock() {
return fmt.Errorf("dynamic optimizer search or analysis has been in running")
}
defer s.Unlock()
ch := make(chan *PB.TuningMessage)
defer close(ch)
go func() {
for value := range ch {
_ = stream.Send(value)
}
}()
var optimizer = tuning.Optimizer{}
defer func() {
if err := optimizer.DeleteTask(); err != nil {
log.Errorf("delete optimizer task failed, error: %v", err)
}
}()
stopCh := make(chan int, 1)
defer close(stopCh)
var cycles int32 = 0
var message string
var step int32 = 1
for {
select {
case stop := <-stopCh:
if cycles > 0 {
if stop == 2 {
cycles = 1
}
_ = stream.Send(&PB.TuningMessage{State: PB.TuningMessage_JobRestart})
} else {
_ = stream.Send(&PB.TuningMessage{State: PB.TuningMessage_Ending})
}
cycles--
default:
}
if cycles < 0 {
break
}
reply, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
state := reply.GetState()
switch state {
case PB.TuningMessage_SyncConfig:
optimizer.Content = reply.GetContent()
err = optimizer.SyncTunedNode(ch)
if err != nil {
return err
}
case PB.TuningMessage_GetInitialConfig:
optimizer.Content = reply.GetContent()
err = optimizer.GetNodeInitialConfig(ch)
if err != nil {
return err
}
case PB.TuningMessage_JobRestart:
log.Infof("restart cycles is: %d", cycles)
optimizer.Content = reply.GetContent()
if cycles > 0 {
optimizer.EvalStatistics = make([]float64, 0)
message = fmt.Sprintf("%d、Starting the next cycle of parameter selection......", step)
step += 1
ch <- &PB.TuningMessage{State: PB.TuningMessage_Display, Content: []byte(message)}
if err = optimizer.InitFeatureSel(ch, stopCh); err != nil {
return err
}
} else {
message = fmt.Sprintf("%d、Start to tuning the system......", step)
step += 1
ch <- &PB.TuningMessage{State: PB.TuningMessage_Display, Content: []byte(message)}
if err = optimizer.InitTuned(ch, stopCh); err != nil {
return err
}
}
case PB.TuningMessage_JobInit:
project := reply.GetName()
if len(strings.TrimSpace(project)) == 0 {
if err != nil {
return err
}
message = fmt.Sprintf("%d.Begin to Analysis the system......", step)
step += 1
ch <- &PB.TuningMessage{State: PB.TuningMessage_Display, Content: []byte(message)}
project, err = s.Getworkload()
if err != nil {
return err
}
message = fmt.Sprintf("%d.Current running application is: %s", step, project)
step += 1
ch <- &PB.TuningMessage{State: PB.TuningMessage_Display, Content: []byte(message)}
}
message = fmt.Sprintf("%d.Loading its corresponding tuning project: %s", step, project)
step += 1
ch <- &PB.TuningMessage{State: PB.TuningMessage_Display, Content: []byte(message)}
if err = tuning.CheckServerPrj(project, &optimizer); err != nil {
return err
}
optimizer.Engine = reply.GetEngine()
optimizer.Content = reply.GetContent()
optimizer.Restart = reply.GetRestart()
optimizer.RandomStarts = reply.GetRandomStarts()
optimizer.HistoryPath=reply.GetHistoryPath()
optimizer.FeatureFilterEngine = reply.GetFeatureFilterEngine()
optimizer.FeatureFilterIters = reply.GetFeatureFilterIters()
optimizer.SplitCount = reply.GetSplitCount()
optimizer.PrjId = strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
cycles = reply.GetFeatureFilterCycle()
optimizer.FeatureFilterCount = reply.GetFeatureFilterCount()
optimizer.EvalFluctuation = reply.GetEvalFluctuation()
optimizer.FeatureSelector = reply.GetFeatureSelector()
ch <- &PB.TuningMessage{State: PB.TuningMessage_JobCreate}
case PB.TuningMessage_JobCreate:
optimizer.EvalBase = reply.GetTuningLog().GetBaseEval()
optimizer.Evaluations = reply.GetTuningLog().GetSumEval()
optimizer.TuningParams = make(utils.SortedPair, 0)
if cycles == 0 {
if optimizer.Restart {
message = fmt.Sprintf("%d.Continue to tuning the system......", step)
} else {
message = fmt.Sprintf("%d.Start to tuning the system......", step)
}
ch <- &PB.TuningMessage{State: PB.TuningMessage_Display, Content: []byte(message)}
step += 1
if err = optimizer.InitTuned(ch, stopCh); err != nil {
return err
}
} else {
optimizer.EvalStatistics = make([]float64, 0)
message = fmt.Sprintf("%d.Starting to select the important parameters......", step)
ch <- &PB.TuningMessage{State: PB.TuningMessage_Display, Content: []byte(message)}
step += 1
if err = optimizer.InitFeatureSel(ch, stopCh); err != nil {
return err
}
}
case PB.TuningMessage_Restore:
project := reply.GetName()
log.Infof("begin to restore project: %s", project)
if err := tuning.CheckServerPrj(project, &optimizer); err != nil {
return err
}
if err := optimizer.RestoreConfigTuned(ch); err != nil {
return err
}
log.Infof("restore project %s success", project)
return nil
case PB.TuningMessage_BenchMark:
optimizer.Content = reply.GetContent()
optimizer.Evaluations = reply.GetTuningLog().GetSumEval()
err := optimizer.DynamicTuned(ch, stopCh)
if err != nil {
return err
}
}
}
return nil
}
/*
UpgradeProfile method update the db file
*/
func (s *ProfileServer) UpgradeProfile(profileInfo *PB.ProfileInfo, stream PB.ProfileMgr_UpgradeProfileServer) error {
isLocalAddr, err := SVC.CheckRpcIsLocalAddr(stream.Context())
if err != nil {
return err
}
if !isLocalAddr {
return fmt.Errorf("the upgrade command can not be remotely operated")
}
log.Debug("Begin to upgrade profiles\n")
currenDbPath := path.Join(config.DatabasePath, config.DatabaseName)
newDbPath := profileInfo.GetName()
exist, err := utils.PathExist(config.DefaultTempPath)
if err != nil {
return err
}
if !exist {
if err = os.MkdirAll(config.DefaultTempPath, 0750); err != nil {
return err
}
}
timeUnix := strconv.FormatInt(time.Now().Unix(), 10) + ".db"
tempFile := path.Join(config.DefaultTempPath, timeUnix)
if err := utils.CopyFile(tempFile, currenDbPath); err != nil {
_ = stream.Send(&PB.AckCheck{Name: err.Error(), Status: utils.FAILD})
return nil
}
if err := utils.CopyFile(currenDbPath, newDbPath); err != nil {
_ = stream.Send(&PB.AckCheck{Name: err.Error(), Status: utils.FAILD})
return nil
}
if err := sqlstore.Reload(currenDbPath); err != nil {
_ = stream.Send(&PB.AckCheck{Name: err.Error(), Status: utils.FAILD})
return nil
}
_ = stream.Send(&PB.AckCheck{Name: "upgrade success", Status: utils.SUCCESS})
return nil
}
/*
InfoProfile method display the content of the specified workload type
*/
func (s *ProfileServer) InfoProfile(profileInfo *PB.ProfileInfo, stream PB.ProfileMgr_InfoProfileServer) error {
var context string
profileName := profileInfo.GetName()
err := filepath.Walk(config.DefaultProfilePath, func(absPath string, info os.FileInfo, err error) error {
if !info.IsDir() {
absFilename := absPath[len(config.DefaultProfilePath)+1:]
filenameOnly := strings.TrimSuffix(strings.ReplaceAll(absFilename, "/", "-"),
path.Ext(info.Name()))
if filenameOnly == profileName {
data, err := ioutil.ReadFile(absPath)
if err != nil {
return err
}
context = "\n*** " + profileName + ":\n" + string(data)
_ = stream.Send(&PB.ProfileInfo{Name: context})
return nil
}
}
return nil
})
if err != nil {
return err
}
if context == "" {
log.Errorf("profile %s is not exist", profileName)
return fmt.Errorf("profile %s is not exist", profileName)
}
return nil
}
/*
CheckActiveProfile method check current active profile is effective
*/
func (s *ProfileServer) CheckActiveProfile(profileInfo *PB.ProfileInfo,
stream PB.ProfileMgr_CheckActiveProfileServer) error {
log.Debug("Begin to check active profiles\n")
profileLogs, err := sqlstore.GetProfileLogs()
if err != nil {
return err
}
var activeName string
if len(profileLogs) > 0 {
activeName = profileLogs[0].ProfileID
}
if activeName == "" {
return fmt.Errorf("no active profile or more than 1 active profile")
}
classProfile := &sqlstore.GetClass{Class: activeName}
err = sqlstore.GetClasses(classProfile)
if err != nil {
return fmt.Errorf("inquery workload type table failed %v", err)
}
if len(classProfile.Result) > 0 {
activeName = classProfile.Result[0].ProfileType
}
log.Debugf("active name is %s", activeName)
profile, ok := profile.LoadFromProfile(activeName)
if !ok {
log.WithField("profile", activeName).Errorf("Load profile %s failed", activeName)
return fmt.Errorf("load profile %s Failed", activeName)
}
ch := make(chan *PB.AckCheck)
defer close(ch)
go func() {
for value := range ch {
_ = stream.Send(value)
}
}()
if err := profile.Check(ch); err != nil {
return err
}
return nil
}
// ProfileRollback method rollback the profile to init state
func (s *ProfileServer) ProfileRollback(profileInfo *PB.ProfileInfo, stream PB.ProfileMgr_ProfileRollbackServer) error {
profileLogs, err := sqlstore.GetProfileLogs()
if err != nil {
return err
}
if len(profileLogs) < 1 {
_ = stream.Send(&PB.AckCheck{Name: "no profile need to rollback"})
return nil
}
sort.Slice(profileLogs, func(i, j int) bool {
return profileLogs[i].ID > profileLogs[j].ID
})
//static profile setting
ch := make(chan *PB.AckCheck)
go func() {
for value := range ch {
_ = stream.Send(value)
}
}()
for _, pro := range profileLogs {
log.Infof("begin to restore profile id: %d", pro.ID)
profileInfo := profile.HistoryProfile{}
_ = profileInfo.Load(pro.Context)
_ = profileInfo.Resume(ch)
// delete profile log after restored
if err := sqlstore.DelProfileLogByID(pro.ID); err != nil {
return err
}
//delete backup dir
if err := os.RemoveAll(pro.BackupPath); err != nil {
return err
}
// update active profile after restored
if err := sqlstore.ActiveProfile(pro.ProfileID); err != nil {
return nil
}
}
if err := sqlstore.InActiveProfile(); err != nil {
return nil
}
return nil
}
/*
Collection method call collection script to collect system data.
*/
func (s *ProfileServer) Collection(message *PB.CollectFlag, stream PB.ProfileMgr_CollectionServer) error {
isLocalAddr, err := SVC.CheckRpcIsLocalAddr(stream.Context())
if err != nil {
return err
}
if !isLocalAddr {
return fmt.Errorf("the collection command can not be remotely operated")
}
if valid := utils.IsInputStringValid(message.GetWorkload()); !valid {
return fmt.Errorf("input:%s is invalid", message.GetWorkload())
}
if valid, _ := regexp.MatchString("^[a-zA-Z]+(_[a-zA-Z0-9]+)*$", message.GetOutputDir()); !valid {
return fmt.Errorf("output dir:%s is invalid", message.GetOutputDir())
}
if valid := utils.IsInputStringValid(message.GetType()); !valid {
return fmt.Errorf("input:%s is invalid", message.GetType())
}
if valid := utils.IsInputStringValid(message.GetBlock()); !valid {
return fmt.Errorf("input:%s is invalid", message.GetBlock())
}
if valid := utils.IsInputStringValid(message.GetNetwork()); !valid {
return fmt.Errorf("input:%s is invalid", message.GetNetwork())
}
classProfile := &sqlstore.GetClass{Class: message.GetType()}
if err = sqlstore.GetClasses(classProfile); err != nil {
return err
}
if len(classProfile.Result) == 0 {
return fmt.Errorf("app type %s is not exist, use define command first", message.GetType())
}
profileType := classProfile.Result[0].ProfileType
include, err := profile.GetProfileInclude(profileType)
if err != nil {
return err
}
outputDir := path.Join(config.DefaultCollectionPath, message.GetOutputDir())
exist, err := utils.PathExist(message.GetOutputDir())
if err != nil {
return err
}
if !exist {
_ = os.MkdirAll(outputDir, 0755)
}
if err = utils.InterfaceByName(message.GetNetwork()); err != nil {
return err
}
if err = utils.DiskByName(message.GetBlock()); err != nil {
return err
}
collections, err := sqlstore.GetCollections()
if err != nil {
log.Errorf("inquery collection tables error: %v", err)
return err
}
monitors := make([]Monitor, 0)
for _, collection := range collections {
re := regexp.MustCompile(`\{([^}]+)\}`)
matches := re.FindAllStringSubmatch(collection.Metrics, -1)
if len(matches) > 0 {
for _, match := range matches {
if len(match) < 2 {
continue
}
var value string
if match[1] == "disk" {
value = message.GetBlock()
} else if match[1] == "network" {
value = message.GetNetwork()
} else if match[1] == "interval" {
value = strconv.FormatInt(message.GetInterval(), 10)
} else {
log.Warnf("%s is not recognized", match[1])
continue
}
re = regexp.MustCompile(`\{(` + match[1] + `)\}`)
collection.Metrics = re.ReplaceAllString(collection.Metrics, value)
}
}
monitor := Monitor{Module: collection.Module, Purpose: collection.Purpose, Field: collection.Metrics}
monitors = append(monitors, monitor)
}
collectorBody := new(CollectorPost)
collectorBody.SampleNum = int(message.GetDuration() / message.GetInterval())
collectorBody.Monitors = monitors
nowTime := time.Now().Format("20060702-150405")
fileName := fmt.Sprintf("%s-%s.csv", message.GetWorkload(), nowTime)
collectorBody.File = path.Join(outputDir, fileName)
if include == "" {
include = "default"
}
collectorBody.DataType = fmt.Sprintf("%s:%s", include, message.GetType())
_ = stream.Send(&PB.AckCheck{Name: "start to collect data"})
_, err = collectorBody.Post()
if err != nil {
_ = stream.Send(&PB.AckCheck{Name: err.Error()})
return err
}
_ = stream.Send(&PB.AckCheck{Name: fmt.Sprintf("generate %s successfully", collectorBody.File)})
return nil
}
/*
Training method train the collected data to generate the model
*/
func (s *ProfileServer) Training(message *PB.TrainMessage, stream PB.ProfileMgr_TrainingServer) error {
isLocalAddr, err := SVC.CheckRpcIsLocalAddr(stream.Context())
if err != nil {
return err
}
if !isLocalAddr {
return fmt.Errorf("the train command can not be remotely operated")
}
DataPath := message.GetDataPath()
ModelName := message.GetModelName()
compressPath, err := utils.CreateCompressFile(DataPath)
if err != nil {
log.Debugf("Failed to compress %s: %v", DataPath, err)
_ = stream.Send(&PB.AckCheck{Name: err.Error()})
return err
}
defer os.Remove(compressPath)
trainPath, err := Post("training", "file", compressPath)
if err != nil {
log.Debugf("Failed to transfer file: %v", err)
_ = stream.Send(&PB.AckCheck{Name: err.Error()})
return err
}
trainBody := new(models.Training)
trainBody.DataPath = trainPath
trainBody.ModelName = ModelName
trainBody.ModelPath = path.Join(config.DefaultAnalysisPath, "models")
success, err := trainBody.Post()
if err != nil {
return err
}
if success {
_ = stream.Send(&PB.AckCheck{Name: "training the self collect data success"})
return nil
}
_ = stream.Send(&PB.AckCheck{Name: "training the self collect data failed"})
return nil
}
// Detecting method detect the misclassified data
func (s *ProfileServer) Detecting(message *PB.DetectMessage, stream PB.ProfileMgr_DetectingServer) error {
AppName := message.GetAppName()
DetectPath := message.GetDetectPath()
_ = stream.Send(&PB.AckCheck{Name: "Detecting the misclassified data"})
detectBody := new(models.Detecting)
detectBody.AppName = AppName
detectBody.DetectPath = DetectPath
success, detecterr, result := detectBody.Get()
if detecterr != nil {
return detecterr
}
if success {
_ = stream.Send(&PB.AckCheck{Name: "Detecting misclassified data success\n"})
s := strings.Split(result[1:(len(result)-2)], "@")
for _, str := range s {
_ = stream.Send(&PB.AckCheck{Name: str})
}
return nil
}
_ = stream.Send(&PB.AckCheck{Name: "Detecting misclassified data failed"})
return nil
}
// Define method user define workload type and profile
func (s *ProfileServer) Define(ctx context.Context, message *PB.DefineMessage) (*PB.Ack, error) {
isLocalAddr, err := SVC.CheckRpcIsLocalAddr(ctx)
if err != nil {
return &PB.Ack{}, err
}
if !isLocalAddr {
return &PB.Ack{}, fmt.Errorf("the define command can not be remotely operated")
}
serviceType := message.GetServiceType()
applicationName := message.GetApplicationName()
scenarioName := message.GetScenarioName()
content := string(message.GetContent())
profileName := serviceType + "-" + applicationName + "-" + scenarioName
workloadTypeExist, err := sqlstore.ExistWorkloadType(profileName)
if err != nil {
return &PB.Ack{}, err
}
if !workloadTypeExist {
if err = sqlstore.InsertClassApps(&sqlstore.ClassApps{
Class: profileName,
Apps: profileName,
Deletable: true}); err != nil {
return &PB.Ack{}, err
}
}
profileNameExist, err := sqlstore.ExistProfileName(profileName)
if err != nil {
return &PB.Ack{}, err
}
if !profileNameExist {
if err = sqlstore.InsertClassProfile(&sqlstore.ClassProfile{
Class: profileName,
ProfileType: profileName,
Active: false}); err != nil {
return &PB.Ack{}, err
}
}
profileExist, err := profile.ExistProfile(profileName)
if err != nil {
return &PB.Ack{}, err
}
if profileExist {
return &PB.Ack{Status: fmt.Sprintf("%s is already exist", profileName)}, nil
}
dstPath := path.Join(config.DefaultProfilePath, serviceType, applicationName)
err = utils.CreateDir(dstPath, utils.FilePerm)
if err != nil {
return &PB.Ack{}, err
}
dstFile := path.Join(dstPath, fmt.Sprintf("%s.conf", scenarioName))
err = utils.WriteFile(dstFile, content, utils.FilePerm, os.O_WRONLY|os.O_CREATE)
if err != nil {
log.Error(err)
return &PB.Ack{}, err
}
return &PB.Ack{Status: "OK"}, nil
}
// Delete method delete the self define workload type from database
func (s *ProfileServer) Delete(ctx context.Context, message *PB.ProfileInfo) (*PB.Ack, error) {
isLocalAddr, err := SVC.CheckRpcIsLocalAddr(ctx)
if err != nil {
return &PB.Ack{}, err
}
if !isLocalAddr {
return &PB.Ack{}, fmt.Errorf("the undefine command can not be remotely operated")
}
profileName := message.GetName()
classApps := &sqlstore.GetClassApp{Class: profileName}
if err = sqlstore.GetClassApps(classApps); err != nil {
return &PB.Ack{}, err
}
if len(classApps.Result) != 1 || !classApps.Result[0].Deletable {
return &PB.Ack{Status: "only self defined type can be deleted"}, nil
}
if err = sqlstore.DeleteClassApps(profileName); err != nil {
return &PB.Ack{}, err
}
profileNameExist, err := sqlstore.ExistProfileName(profileName)
if err != nil {
return &PB.Ack{}, err
}
if profileNameExist {
if err := sqlstore.DeleteClassProfile(profileName); err != nil {
log.Errorf("delete item from class_profile table failed %v ", err)
}
}
if err := profile.DeleteProfile(profileName); err != nil {
log.Errorf("delete item from profile table failed %v", err)
}
return &PB.Ack{Status: "OK"}, nil
}
// Update method update the content of the specified workload type from database
func (s *ProfileServer) Update(ctx context.Context, message *PB.ProfileInfo) (*PB.Ack, error) {
isLocalAddr, err := SVC.CheckRpcIsLocalAddr(ctx)
if err != nil {
return &PB.Ack{}, err
}
if !isLocalAddr {
return &PB.Ack{}, fmt.Errorf("the update command can not be remotely operated")
}
profileName := message.GetName()
content := string(message.GetContent())
profileExist, err := profile.ExistProfile(profileName)
if err != nil {
return &PB.Ack{}, err
}
if !profileExist {
return &PB.Ack{}, fmt.Errorf("profile name %s is exist", profileName)
}
err = profile.UpdateProfile(profileName, content)
if err != nil {
return &PB.Ack{}, err
}
return &PB.Ack{Status: "OK"}, nil
}
// Schedule cpu/irq/numa ...
func (s *ProfileServer) Schedule(message *PB.ScheduleMessage,
stream PB.ProfileMgr_ScheduleServer) error {
pids := message.GetApp()
Strategy := message.GetStrategy()
scheduler := schedule.GetScheduler()
ch := make(chan *PB.AckCheck)
defer close(ch)
go func() {
for value := range ch {
_ = stream.Send(value)
}
}()
err := scheduler.Schedule(pids, Strategy, true, ch)
if err != nil {
_ = stream.Send(&PB.AckCheck{Name: err.Error(), Status: utils.FAILD})
return err
}
_ = stream.Send(&PB.AckCheck{Name: "schedule finished"})
return nil
}
func (s *ProfileServer) collection(npipe string, time string) (*RespCollectorPost, error) {
//1. get the dimension structure of the system data to be collected
collections, err := sqlstore.GetCollections()
if err != nil {
log.Errorf("inquery collection tables error: %v", err)
return nil, err
}
// 1.1 send the collect data command to the monitor service
monitors := make([]Monitor, 0)
for _, collection := range collections {
re := regexp.MustCompile(`\{([^}]+)\}`)
matches := re.FindAllStringSubmatch(collection.Metrics, -1)
if len(matches) > 0 {
for _, match := range matches {
if len(match) < 2 {
continue
}
var value string
if s.Raw.Section("system").Haskey(match[1]) {
value = s.Raw.Section("system").Key(match[1]).Value()
} else if s.Raw.Section("server").Haskey(match[1]) {
value = s.Raw.Section("server").Key(match[1]).Value()
} else {
return nil, fmt.Errorf("%s is not exist in the system or server section", match[1])
}
re = regexp.MustCompile(`\{(` + match[1] + `)\}`)
collection.Metrics = re.ReplaceAllString(collection.Metrics, value)
}
}
monitor := Monitor{Module: collection.Module, Purpose: collection.Purpose, Field: collection.Metrics}
monitors = append(monitors, monitor)
}
sampleNum := s.Raw.Section("server").Key("sample_num").MustInt(20)
if sampleNum == 0 {
sampleNum = 20
}
if time != "" {
sampleNum, err = strconv.Atoi(time)
if err != nil {
return nil, err
}
}
collectorBody := new(CollectorPost)
collectorBody.SampleNum = sampleNum
collectorBody.Monitors = monitors
collectorBody.File = path.Join(config.DefaultTempPath, "test.csv")
if npipe != "" {
collectorBody.Pipe = npipe
}
log.Infof("tuning collector body is %v", collectorBody)
respCollectPost, err := collectorBody.Post()
if err != nil {
return nil, err
}
return respCollectPost, nil
}
func (s *ProfileServer) classify(dataPath string, customeModel string) (int, string, string, error) {
//2. send the collected data to the model for completion type identification
var resourceLimit string
var workloadType string
var bottleneck int
localPath, timestamp, err := utils.ChangeFileName(dataPath)
if err != nil {
log.Errorf("Failed to change file name: %v", localPath)
return bottleneck, workloadType, resourceLimit, err
}
defer os.Remove(localPath)
absPath, _ := filepath.Split(localPath)
logPath := absPath + "test-" + timestamp + ".log"
log.Infof("log file path: %s", logPath)
_, err = os.OpenFile(logPath, os.O_WRONLY|os.O_CREATE, 0640)
if err != nil {
log.Errorf("Failed to create log file: %v", err)
return bottleneck, workloadType, resourceLimit, err
}
dataPath, err = Post("classification", "file", localPath)
if err != nil {
log.Errorf("Failed transfer file to server: %v", err)
return bottleneck, workloadType, resourceLimit, err
}
body := new(ClassifyPostBody)
body.Data = dataPath
body.ModelPath = path.Join(config.DefaultAnalysisPath, "models")
if customeModel != "" {
body.Model = customeModel
}
respPostIns, err := body.Post()
if err != nil {
return bottleneck, workloadType, resourceLimit, err
}
log.Infof("workload: %s, cluster result resource limit: %s",
respPostIns.WorkloadType, respPostIns.ResourceLimit)
resourceLimit = respPostIns.ResourceLimit
workloadType = respPostIns.WorkloadType
bottleneck = respPostIns.Bottleneck
return bottleneck, workloadType, resourceLimit, nil
}
func (s *ProfileServer) Getworkload() (string, error) {
var npipe string
var customeModel string
respCollectPost, err := s.collection(npipe, "")
if err != nil {
return "", err
}
_, workload, _, err := s.classify(respCollectPost.Path, customeModel)
if err != nil {
return "", err
}
if len(workload) == 0 {
return "", fmt.Errorf("workload is empty")
}
return workload, nil
}
// Generate method generate the yaml file for tuning
func (s *ProfileServer) Generate(message *PB.ProfileInfo, stream PB.ProfileMgr_GenerateServer) error {
ch := make(chan *PB.AckCheck)
defer close(ch)
go func() {
for value := range ch {
_ = stream.Send(value)
}
}()
_ = stream.Send(&PB.AckCheck{Name: "1.Start to analysis the system bottleneck"})
var npipe string
respCollectPost, err := s.collection(npipe, "")
if err != nil {
return err
}
log.Infof("collect data response body is: %+v", respCollectPost)
collectData := respCollectPost.Data
projectName := message.GetName()
_ = stream.Send(&PB.AckCheck{Name: "2.Finding potential tuning parameters"})
var tuningData tuning.TuningData
err = mapstructure.Decode(collectData, &tuningData)
if err != nil {
return err
}
log.Infof("decode to structure is: %+v", tuningData)
ruleFile := path.Join(config.DefaultRulePath, config.TuningRuleFile)
engine := tuning.NewRuleEngine(ruleFile)
if engine == nil {
return fmt.Errorf("create rules engine failed")
}
tuningFile := tuning.NewTuningFile(projectName, ch)
err = tuningFile.Load()
if err != nil {
return err
}
err = engine.AddContext("TuningData", &tuningData)
if err != nil {
return err
}
err = engine.AddContext("TuningFile", tuningFile)
if err != nil {
return err
}
err = engine.Execute()
if err != nil {
return err
}
if len(tuningFile.PrjSrv.Object) <= 0 {
_ = stream.Send(&PB.AckCheck{Name: " No tuning parameters founed"})
return nil
}
dstFile := path.Join(config.DefaultTuningPath, fmt.Sprintf("%s.yaml", projectName))
log.Infof("generate tuning file: %s", dstFile)
err = tuningFile.Save(dstFile)
if err != nil {
return err
}
_ = stream.Send(&PB.AckCheck{Name: fmt.Sprintf("3. Generate tuning project: %s\n project name: %s", dstFile, projectName)})
return nil
}