diff --git a/cmd/katalyst-agent/app/options/sysadvisor/qosaware/model/borwein/borwein.go b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/model/borwein/borwein.go index 9a2c66369..c7edcd439 100644 --- a/cmd/katalyst-agent/app/options/sysadvisor/qosaware/model/borwein/borwein.go +++ b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/model/borwein/borwein.go @@ -26,23 +26,27 @@ import ( ) type BorweinOptions struct { - InferenceServiceSocketAbsPath string - FeatureDescriptionFilePath string - NodeFeatureNames []string - ContainerFeatureNames []string + InferenceServiceSocketAbsPath string + ModelNameToInferenceSvcSockAbsPath map[string]string + FeatureDescriptionFilePath string + NodeFeatureNames []string + ContainerFeatureNames []string } func NewBorweinOptions() *BorweinOptions { return &BorweinOptions{ - NodeFeatureNames: []string{}, - ContainerFeatureNames: []string{}, + ModelNameToInferenceSvcSockAbsPath: map[string]string{}, + NodeFeatureNames: []string{}, + ContainerFeatureNames: []string{}, } } // AddFlags adds flags to the specified FlagSet. func (o *BorweinOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.InferenceServiceSocketAbsPath, "borwein-inference-svc-socket-path", o.InferenceServiceSocketAbsPath, - "socket path which borwein inference server listens at") + "socket path which borwein inference server listens at. it's deprecated, use borwein-inference-model-to-svc-socket-path intead") + fs.StringToStringVar(&o.ModelNameToInferenceSvcSockAbsPath, "borwein-inference-model-to-svc-socket-path", o.ModelNameToInferenceSvcSockAbsPath, + "model name to socket path which its borwein inference server listens at") fs.StringVar(&o.FeatureDescriptionFilePath, "feature-description-filepath", o.FeatureDescriptionFilePath, "file path to feature descriptions, the option has lower priority to borwein-node-feature-names and borwein-container-feature-names") fs.StringSliceVar(&o.NodeFeatureNames, "borwein-node-feature-names", o.NodeFeatureNames, @@ -60,6 +64,8 @@ func (o *BorweinOptions) ApplyTo(c *borwein.BorweinConfiguration) error { }{} c.InferenceServiceSocketAbsPath = o.InferenceServiceSocketAbsPath + c.ModelNameToInferenceSvcSockAbsPath = o.ModelNameToInferenceSvcSockAbsPath + if len(o.NodeFeatureNames)+len(o.ContainerFeatureNames) > 0 { c.NodeFeatureNames = o.NodeFeatureNames c.ContainerFeatureNames = o.ContainerFeatureNames diff --git a/go.mod b/go.mod index 28b8563e6..03a1c60bf 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/google/uuid v1.3.0 github.com/h2non/gock v1.2.0 github.com/klauspost/cpuid/v2 v2.2.6 - github.com/kubewharf/katalyst-api v0.5.1-0.20240911051124-d5c09c115a19 + github.com/kubewharf/katalyst-api v0.5.1-0.20240929080014-ae613a8935d8 github.com/montanaflynn/stats v0.7.1 github.com/opencontainers/runc v1.1.6 github.com/opencontainers/selinux v1.10.0 diff --git a/go.sum b/go.sum index 34cc65fd5..39907aa39 100644 --- a/go.sum +++ b/go.sum @@ -568,8 +568,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubewharf/katalyst-api v0.5.1-0.20240911051124-d5c09c115a19 h1:5kz2u+tLNzW94ByL9yfsfkvE4N8n/r6C1TKDiFWK8rw= -github.com/kubewharf/katalyst-api v0.5.1-0.20240911051124-d5c09c115a19/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k= +github.com/kubewharf/katalyst-api v0.5.1-0.20240929080014-ae613a8935d8 h1:rIwZVD4iia7pTiB6h1xR8muc4jx4GgHWzhVCPKlEhXw= +github.com/kubewharf/katalyst-api v0.5.1-0.20240929080014-ae613a8935d8/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k= github.com/kubewharf/kubelet v1.24.6-kubewharf.9 h1:jOTYZt7h/J7I8xQMKMUcJjKf5UFBv37jHWvNp5VRFGc= github.com/kubewharf/kubelet v1.24.6-kubewharf.9/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c= github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8= diff --git a/pkg/agent/sysadvisor/metacache/metacache_dummy.go b/pkg/agent/sysadvisor/metacache/metacache_dummy.go new file mode 100644 index 000000000..7f3502b9c --- /dev/null +++ b/pkg/agent/sysadvisor/metacache/metacache_dummy.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metacache + +import ( + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +func NewDummyMetaCacheImp() *MetaCacheImp { + return &MetaCacheImp{ + podEntries: make(types.PodEntries), + poolEntries: make(types.PoolEntries), + regionEntries: make(types.RegionEntries), + modelToResult: make(map[string]interface{}), + containerCreateTimestamp: make(map[string]int64), + emitter: metrics.DummyMetrics{}, + } +} diff --git a/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/borwein.go b/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/borwein.go index 7b6be2aaf..1dcdd177c 100644 --- a/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/borwein.go +++ b/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/borwein.go @@ -22,18 +22,20 @@ import ( "sync" "time" + "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" //nolint "github.com/golang/protobuf/proto" - apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher" borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts" borweininfsvc "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc" borweintypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/types" + borweinutils "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/config/generic" @@ -60,14 +62,16 @@ type BorweinModelResultFetcher struct { name string qosConfig *generic.QoSConfiguration - nodeFeatureNames []string // handled by GetNodeFeature - containerFeatureNames []string // handled by GetContainerFeature - inferenceServiceSocketAbsPath string + nodeFeatureNames []string // handled by GetNodeFeature + containerFeatureNames []string // handled by GetContainerFeature + inferenceServiceSocketAbsPath string + modelNameToInferenceSvcSockAbsPath map[string]string // map modelName to inference server sock path emitter metrics.MetricEmitter - infSvcClient borweininfsvc.InferenceServiceClient - clientLock sync.RWMutex + infSvcClient borweininfsvc.InferenceServiceClient + modelNameToInferenceSvcClient map[string]borweininfsvc.InferenceServiceClient // map modelName to its inference client + clientLock sync.RWMutex } const ( @@ -133,7 +137,7 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met metaWriter metacache.MetaWriter, metaServer *metaserver.MetaServer, ) error { bmrf.clientLock.RLock() - if bmrf.infSvcClient == nil { + if bmrf.infSvcClient == nil && len(bmrf.modelNameToInferenceSvcClient) == 0 { bmrf.clientLock.RUnlock() return fmt.Errorf("infSvcClient isn't initialized") } @@ -152,9 +156,9 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met return } - if containerInfo.QoSLevel == apiconsts.PodAnnotationQoSLevelSharedCores || containerInfo.IsDedicatedNumaExclusive() { - requestContainers = append(requestContainers, containerInfo.Clone()) - } + // try to inference for main containers of all QoS levels, + // and filter results when parsing resp + requestContainers = append(requestContainers, containerInfo.Clone()) return }) @@ -171,27 +175,56 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met } bmrf.clientLock.RLock() - resp, err := bmrf.infSvcClient.Inference(ctx, req) + var infSvcClients map[string]borweininfsvc.InferenceServiceClient + if len(bmrf.modelNameToInferenceSvcClient) > 0 { + infSvcClients = bmrf.modelNameToInferenceSvcClient + } else { + infSvcClients = map[string]borweininfsvc.InferenceServiceClient{ + borweinconsts.ModelNameBorwein: bmrf.infSvcClient, + } + } bmrf.clientLock.RUnlock() - if err != nil { - _ = bmrf.emitter.StoreInt64(metricInferenceFailed, 1, metrics.MetricTypeNameRaw) - return fmt.Errorf("Inference failed with error: %v", err) - } + errCh := make(chan error, len(infSvcClients)) + for modelName, client := range infSvcClients { + go func(modelName string, client borweininfsvc.InferenceServiceClient, errCh chan error) { + if client == nil { + errCh <- fmt.Errorf("nil client for model: %s", modelName) + return + } - borweinInferenceResults, err := bmrf.parseInferenceRespForPods(requestContainers, resp) - if err != nil { - _ = bmrf.emitter.StoreInt64(metricParseInferenceResponseFailed, 1, metrics.MetricTypeNameRaw) - return fmt.Errorf("parseInferenceRespForPods failed with error: %v", err) + resp, err := client.Inference(ctx, req) + if err != nil { + _ = bmrf.emitter.StoreInt64(metricInferenceFailed, 1, metrics.MetricTypeNameRaw) + errCh <- fmt.Errorf("Inference by model: %s failed with error: %v", modelName, err) + return + } + + borweinInferenceResults, err := bmrf.parseInferenceRespForPods(requestContainers, resp) + if err != nil { + _ = bmrf.emitter.StoreInt64(metricParseInferenceResponseFailed, 1, metrics.MetricTypeNameRaw) + errCh <- fmt.Errorf("parseInferenceRespForPods from model: %s failed with error: %v", modelName, err) + return + } + + err = metaWriter.SetInferenceResult(borweinutils.GetInferenceResultKey(modelName), borweinInferenceResults) + if err != nil { + _ = bmrf.emitter.StoreInt64(metricSetInferenceResultFailed, 1, metrics.MetricTypeNameRaw) + errCh <- fmt.Errorf("SetInferenceResult from model: %s failed with error: %v", modelName, err) + return + } + + errCh <- nil + return + }(modelName, client, errCh) } - err = metaWriter.SetInferenceResult(borweinconsts.ModelNameBorwein, borweinInferenceResults) - if err != nil { - _ = bmrf.emitter.StoreInt64(metricSetInferenceResultFailed, 1, metrics.MetricTypeNameRaw) - return fmt.Errorf("SetInferenceResult failed with error: %v", err) + errList := make([]error, 0, len(infSvcClients)) + for i := 0; i < len(infSvcClients); i++ { + errList = append(errList, <-errCh) } - return nil + return errors.NewAggregate(errList) } func (bmrf *BorweinModelResultFetcher) parseInferenceRespForPods(requestContainers []*types.ContainerInfo, @@ -216,19 +249,31 @@ func (bmrf *BorweinModelResultFetcher) parseInferenceRespForPods(requestContaine return nil, fmt.Errorf("invalid result for pod: %s, container: %s", podUID, containerName) } - inferenceResults := make([]*borweininfsvc.InferenceResult, len(cResults.InferenceResults)) + inferenceResults := make([]*borweininfsvc.InferenceResult, 0, len(cResults.InferenceResults)) + foundResult := false for idx, result := range cResults.InferenceResults { if result == nil { continue } - inferenceResults[idx] = proto.Clone(result).(*borweininfsvc.InferenceResult) + foundResult = true + + if result.ResultFlag == borweininfsvc.ResultFlag_ResultFlagSkip { + general.Infof("skip %d result for pod: %s, container: %s", idx, podUID, containerName) + continue + } + + inferenceResults = append(inferenceResults, proto.Clone(result).(*borweininfsvc.InferenceResult)) } - results.SetInferenceResults(podUID, containerName, inferenceResults...) - } + if foundResult { + respContainersCnt++ + } - respContainersCnt += len(results.Results[podUID]) + if len(inferenceResults) > 0 { + results.SetInferenceResults(podUID, containerName, inferenceResults...) + } + } } overloadCnt := 0.0 @@ -333,8 +378,44 @@ func (bmrf *BorweinModelResultFetcher) initInferenceSvcClientConn() (bool, error // todo: emit metrics when initializing client connection failed // never success - if bmrf.inferenceServiceSocketAbsPath == "" { - return false, fmt.Errorf("empty inferenceServiceSocketAbsPath") + if bmrf.inferenceServiceSocketAbsPath == "" && len(bmrf.modelNameToInferenceSvcSockAbsPath) == 0 { + return false, fmt.Errorf("empty inference service socks information") + } + + if len(bmrf.modelNameToInferenceSvcSockAbsPath) > 0 { + modelNameToConn := make(map[string]*grpc.ClientConn, len(bmrf.modelNameToInferenceSvcSockAbsPath)) + + allSuccess := true + for modelName, sockAbsPath := range bmrf.modelNameToInferenceSvcSockAbsPath { + infSvcConn, err := process.Dial(sockAbsPath, 5*time.Second) + if err != nil { + general.Errorf("get inference svc connection with socket: %s for model: %s failed with error", + sockAbsPath, modelName) + allSuccess = false + break + } + + modelNameToConn[modelName] = infSvcConn + } + + if !allSuccess { + for modelName, conn := range modelNameToConn { + err := conn.Close() + if err != nil { + general.Errorf("close connection for model: %s failed with error: %v", + modelName, err) + } + } + } else { + bmrf.clientLock.Lock() + bmrf.modelNameToInferenceSvcClient = make(map[string]borweininfsvc.InferenceServiceClient, len(modelNameToConn)) + for modelName, conn := range modelNameToConn { + bmrf.modelNameToInferenceSvcClient[modelName] = borweininfsvc.NewInferenceServiceClient(conn) + } + bmrf.clientLock.Unlock() + } + + return allSuccess, nil } infSvcConn, err := process.Dial(bmrf.inferenceServiceSocketAbsPath, 5*time.Second) @@ -366,12 +447,13 @@ func NewBorweinModelResultFetcher(fetcherName string, conf *config.Configuration emitter := emitterPool.GetDefaultMetricsEmitter().WithTags(BorweinModelResultFetcherName) bmrf := &BorweinModelResultFetcher{ - name: fetcherName, - emitter: emitter, - qosConfig: conf.QoSConfiguration, - nodeFeatureNames: conf.BorweinConfiguration.NodeFeatureNames, - containerFeatureNames: conf.BorweinConfiguration.ContainerFeatureNames, - inferenceServiceSocketAbsPath: conf.BorweinConfiguration.InferenceServiceSocketAbsPath, + name: fetcherName, + emitter: emitter, + qosConfig: conf.QoSConfiguration, + nodeFeatureNames: conf.BorweinConfiguration.NodeFeatureNames, + containerFeatureNames: conf.BorweinConfiguration.ContainerFeatureNames, + inferenceServiceSocketAbsPath: conf.BorweinConfiguration.InferenceServiceSocketAbsPath, + modelNameToInferenceSvcSockAbsPath: conf.BorweinConfiguration.ModelNameToInferenceSvcSockAbsPath, } // fetcher initializing doesn't block sys-adviosr main process diff --git a/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/trainingtpreg/training_throughput_regression.go b/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/trainingtpreg/training_throughput_regression.go new file mode 100644 index 000000000..226f4b046 --- /dev/null +++ b/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/trainingtpreg/training_throughput_regression.go @@ -0,0 +1,74 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trainingtpreg + +import ( + "encoding/json" + "fmt" + + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" + borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts" + borweininfsvc "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc" + borweintypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/types" + borweinutils "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils" + "github.com/kubewharf/katalyst-core/pkg/util/general" +) + +type TrainingThroughputRegression struct { + PredictValue float64 `json:"predict_value"` +} + +func GetTrainingTHRegPredictValue(metaReader metacache.MetaReader) (map[string]map[string]float64, int64, error) { + if metaReader == nil { + return nil, 0, fmt.Errorf("nil metaReader") + } + + inferenceResultKey := borweinutils.GetInferenceResultKey(borweinconsts.ModelNameBorweinTrainingThroughput) + results, err := metaReader.GetInferenceResult(inferenceResultKey) + if err != nil { + return nil, 0, fmt.Errorf("failed to get inference results for %s", inferenceResultKey) + } + + ret := make(map[string]map[string]float64) + var resultTimestamp int64 + switch typedResults := results.(type) { + case *borweintypes.BorweinInferenceResults: + resultTimestamp = typedResults.Timestamp + typedResults.RangeInferenceResults(func(podUID, containerName string, result *borweininfsvc.InferenceResult) { + if result == nil { + return + } + + specificResult := &TrainingThroughputRegression{} + err := json.Unmarshal([]byte(result.GenericOutput), specificResult) + if err != nil { + general.Errorf("invalid generic output: %s for %s", result.GenericOutput, inferenceResultKey) + return + } + + if ret[podUID] == nil { + ret[podUID] = make(map[string]float64) + } + + ret[podUID][containerName] = specificResult.PredictValue + }) + default: + return nil, 0, fmt.Errorf("invalid model result type: %T", typedResults) + } + + return ret, resultTimestamp, nil +} diff --git a/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/trainingtpreg/training_throughput_regression_test.go b/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/trainingtpreg/training_throughput_regression_test.go new file mode 100644 index 000000000..c054ad634 --- /dev/null +++ b/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/trainingtpreg/training_throughput_regression_test.go @@ -0,0 +1,99 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trainingtpreg + +import ( + "encoding/json" + "reflect" + "testing" + "time" + + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" + borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts" + borweininfsvc "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc" + borweintypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/types" + borweinutils "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils" +) + +func TestGetTrainingTHRegPredictValue(t *testing.T) { + t.Parallel() + timeNow := time.Now().Unix() + res := &TrainingThroughputRegression{ + PredictValue: 0.5, + } + bs, _ := json.Marshal(res) + reader := metacache.NewDummyMetaCacheImp() + reader.SetInferenceResult(borweinutils.GetInferenceResultKey(borweinconsts.ModelNameBorweinTrainingThroughput), &borweintypes.BorweinInferenceResults{ + Timestamp: timeNow, + Results: map[string]map[string][]*borweininfsvc.InferenceResult{ + "test": { + "test": []*borweininfsvc.InferenceResult{ + { + GenericOutput: string(bs), + }, + }, + }, + }, + }) + type args struct { + metaReader metacache.MetaReader + } + tests := []struct { + name string + args args + want map[string]map[string]float64 + want1 int64 + wantErr bool + }{ + { + name: "GetTrainingTHRegPredictValue failed", + want: nil, + want1: 0, + wantErr: true, + }, + { + name: "GetTrainingTHRegPredictValue failed", + args: args{ + metaReader: reader, + }, + want: map[string]map[string]float64{ + "test": { + "test": 0.5, + }, + }, + want1: timeNow, + wantErr: false, + }, + } + for _, tt := range tests { + curTT := tt + t.Run(curTT.name, func(t *testing.T) { + t.Parallel() + got, got1, err := GetTrainingTHRegPredictValue(curTT.args.metaReader) + if (err != nil) != curTT.wantErr { + t.Errorf("GetTrainingTHRegPredictValue() error = %v, wantErr %v", err, curTT.wantErr) + return + } + if !reflect.DeepEqual(got, curTT.want) { + t.Errorf("GetTrainingTHRegPredictValue() got = %v, want %v", got, curTT.want) + } + if got1 != curTT.want1 { + t.Errorf("GetTrainingTHRegPredictValue() got1 = %v, want %v", got1, curTT.want1) + } + }) + } +} diff --git a/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts/consts.go b/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts/consts.go index 73ab8759e..908c1accb 100644 --- a/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts/consts.go +++ b/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts/consts.go @@ -17,5 +17,6 @@ limitations under the License. package consts const ( - ModelNameBorwein = "borwein" + ModelNameBorwein = "borwein" + ModelNameBorweinTrainingThroughput = "borwein_training_throughput" ) diff --git a/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc/inference_svc.pb.go b/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc/inference_svc.pb.go index 1f63323f9..e8f1d3654 100644 --- a/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc/inference_svc.pb.go +++ b/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc/inference_svc.pb.go @@ -51,18 +51,21 @@ const ( InferenceType_ClassificationOverload InferenceType = 0 InferenceType_ClassificationUnderload InferenceType = 1 InferenceType_LatencyRegression InferenceType = 2 + InferenceType_Other InferenceType = 15 ) var InferenceType_name = map[int32]string{ - 0: "ClassificationOverload", - 1: "ClassificationUnderload", - 2: "LatencyRegression", + 0: "ClassificationOverload", + 1: "ClassificationUnderload", + 2: "LatencyRegression", + 15: "Other", } var InferenceType_value = map[string]int32{ "ClassificationOverload": 0, "ClassificationUnderload": 1, "LatencyRegression": 2, + "Other": 15, } func (x InferenceType) String() string { @@ -73,6 +76,34 @@ func (InferenceType) EnumDescriptor() ([]byte, []int) { return fileDescriptor_96721cf42274ca03, []int{0} } +type ResultFlag int32 + +const ( + ResultFlag_ResultFlagValid ResultFlag = 0 + ResultFlag_ResultFlagInvalid ResultFlag = 1 + ResultFlag_ResultFlagSkip ResultFlag = 2 +) + +var ResultFlag_name = map[int32]string{ + 0: "ResultFlagValid", + 1: "ResultFlagInvalid", + 2: "ResultFlagSkip", +} + +var ResultFlag_value = map[string]int32{ + "ResultFlagValid": 0, + "ResultFlagInvalid": 1, + "ResultFlagSkip": 2, +} + +func (x ResultFlag) String() string { + return proto.EnumName(ResultFlag_name, int32(x)) +} + +func (ResultFlag) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_96721cf42274ca03, []int{1} +} + type InferenceRequest struct { FeatureNames []string `protobuf:"bytes,1,rep,name=feature_names,json=featureNames,proto3" json:"feature_names,omitempty"` PodRequestEntries map[string]*ContainerRequestEntries `protobuf:"bytes,2,rep,name=pod_request_entries,json=podRequestEntries,proto3" json:"pod_request_entries,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` @@ -363,9 +394,16 @@ type InferenceResult struct { // threshold to judge overload or underload. should be float between 0 and 1. Percentile float32 `protobuf:"fixed32,4,opt,name=percentile,proto3" json:"percentile,omitempty"` // model_version to identify where does the output come from. - ModelVersion string `protobuf:"bytes,5,opt,name=model_version,json=modelVersion,proto3" json:"model_version,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_sizecache int32 `json:"-"` + ModelVersion string `protobuf:"bytes,5,opt,name=model_version,json=modelVersion,proto3" json:"model_version,omitempty"` + // generic_output provides specific inference result from corresponding model, + // it will be decoded by the specific client accesses the model. + GenericOutput string `protobuf:"bytes,6,opt,name=generic_output,json=genericOutput,proto3" json:"generic_output,omitempty"` + // since every entry in request should have a corresponding entry in resp, + // but not all entries need to be inferred. + // so we use result_flag to identify if this entry in resp should be handled. + ResultFlag ResultFlag `protobuf:"varint,7,opt,name=result_flag,json=resultFlag,proto3,enum=inferencesvc.ResultFlag" json:"result_flag,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *InferenceResult) Reset() { *m = InferenceResult{} } @@ -435,8 +473,23 @@ func (m *InferenceResult) GetModelVersion() string { return "" } +func (m *InferenceResult) GetGenericOutput() string { + if m != nil { + return m.GenericOutput + } + return "" +} + +func (m *InferenceResult) GetResultFlag() ResultFlag { + if m != nil { + return m.ResultFlag + } + return ResultFlag_ResultFlagValid +} + func init() { proto.RegisterEnum("inferencesvc.InferenceType", InferenceType_name, InferenceType_value) + proto.RegisterEnum("inferencesvc.ResultFlag", ResultFlag_name, ResultFlag_value) proto.RegisterType((*InferenceRequest)(nil), "inferencesvc.InferenceRequest") proto.RegisterMapType((map[string]*ContainerRequestEntries)(nil), "inferencesvc.InferenceRequest.PodRequestEntriesEntry") proto.RegisterType((*ContainerRequestEntries)(nil), "inferencesvc.ContainerRequestEntries") @@ -453,53 +506,59 @@ func init() { func init() { proto.RegisterFile("inference_svc.proto", fileDescriptor_96721cf42274ca03) } var fileDescriptor_96721cf42274ca03 = []byte{ - // 730 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x4d, 0x6f, 0xd3, 0x4a, - 0x14, 0xcd, 0x24, 0xaf, 0xd5, 0xcb, 0xb4, 0xe9, 0x4b, 0xa6, 0x8f, 0xd4, 0x24, 0xaa, 0x89, 0x82, - 0x80, 0x08, 0xa9, 0xb1, 0x08, 0x20, 0x10, 0xb0, 0xa1, 0xa5, 0x48, 0x20, 0x54, 0x90, 0x81, 0x2e, - 0x58, 0x60, 0x39, 0xce, 0x4d, 0x3a, 0x8a, 0xe3, 0x31, 0x9e, 0xb1, 0xab, 0x88, 0x0d, 0x2c, 0x58, - 0xb1, 0xe9, 0xcf, 0xea, 0x12, 0x76, 0x2c, 0x69, 0x58, 0xf3, 0x17, 0x10, 0xb2, 0x27, 0x1f, 0x76, - 0x88, 0x43, 0x57, 0x99, 0x7b, 0xce, 0xbd, 0xe7, 0xce, 0x9c, 0xb9, 0x19, 0xe3, 0x4d, 0xea, 0x74, - 0xc1, 0x03, 0xc7, 0x02, 0x83, 0x07, 0x56, 0xd3, 0xf5, 0x98, 0x60, 0x64, 0x7d, 0x0a, 0xf2, 0xc0, - 0xaa, 0xec, 0xf4, 0xa8, 0x38, 0xf2, 0xdb, 0x4d, 0x8b, 0x0d, 0xb4, 0x1e, 0xeb, 0x31, 0x2d, 0x4a, - 0x6a, 0xfb, 0xdd, 0x28, 0x8a, 0x82, 0x68, 0x25, 0x8b, 0xeb, 0x9f, 0xb3, 0xb8, 0xf8, 0x64, 0x52, - 0xaf, 0xc3, 0x3b, 0x1f, 0xb8, 0x20, 0x97, 0x71, 0xa1, 0x0b, 0xa6, 0xf0, 0x3d, 0x30, 0x1c, 0x73, - 0x00, 0x5c, 0x41, 0xb5, 0x5c, 0x23, 0xaf, 0xaf, 0x8f, 0xc1, 0x83, 0x10, 0x23, 0x80, 0x37, 0x5d, - 0xd6, 0x31, 0x3c, 0x59, 0x63, 0x80, 0x23, 0x3c, 0x0a, 0x5c, 0xc9, 0xd6, 0x72, 0x8d, 0xb5, 0xd6, - 0xed, 0x66, 0x7c, 0x53, 0xcd, 0xf9, 0x0e, 0xcd, 0x17, 0xac, 0x33, 0x5e, 0xee, 0xcb, 0xba, 0xf0, - 0x67, 0xa8, 0x97, 0xdc, 0x79, 0xbc, 0xd2, 0xc7, 0xe5, 0xc5, 0xc9, 0xa4, 0x88, 0x73, 0x7d, 0x18, - 0x2a, 0xa8, 0x86, 0x1a, 0x79, 0x3d, 0x5c, 0x92, 0xfb, 0x78, 0x25, 0x30, 0x6d, 0x1f, 0x94, 0x6c, - 0x0d, 0x35, 0xd6, 0x5a, 0x57, 0x92, 0x9b, 0xd8, 0x63, 0x8e, 0x30, 0xa9, 0x03, 0x5e, 0x52, 0x4c, - 0x97, 0x35, 0xf7, 0xb2, 0x77, 0x51, 0xfd, 0x17, 0xc2, 0x5b, 0x29, 0x69, 0xe4, 0x3d, 0x56, 0xac, - 0x09, 0x65, 0x4c, 0xec, 0x89, 0x4a, 0xa5, 0x3f, 0x6b, 0xad, 0x87, 0xe7, 0xea, 0x37, 0xc3, 0x1f, - 0x4b, 0x91, 0xc3, 0x48, 0x43, 0x1a, 0x50, 0xb6, 0x16, 0x92, 0x95, 0x2e, 0xae, 0x2e, 0x29, 0x5b, - 0x60, 0xc5, 0x8d, 0xa4, 0x15, 0xd5, 0xe4, 0xd6, 0x12, 0x12, 0x71, 0x03, 0xae, 0xe1, 0x42, 0x82, - 0x23, 0x65, 0xbc, 0x1a, 0x3b, 0x63, 0x5e, 0x1f, 0x47, 0xf5, 0x9f, 0x08, 0x97, 0x62, 0xb7, 0xca, - 0x5d, 0xe6, 0x70, 0x20, 0x14, 0xff, 0x2f, 0x67, 0x42, 0xc6, 0xd3, 0xa1, 0x90, 0xfe, 0xdc, 0x49, - 0x1d, 0x0a, 0x99, 0x2e, 0xa7, 0x42, 0xae, 0x13, 0x63, 0x41, 0xdc, 0x3f, 0x88, 0xca, 0x00, 0x6f, - 0xa5, 0xa4, 0x2f, 0x70, 0xe3, 0x41, 0xd2, 0x8d, 0xab, 0xa9, 0x17, 0x95, 0x50, 0x8b, 0x1b, 0x73, - 0x92, 0xc5, 0x4a, 0x5a, 0x1e, 0xf9, 0x84, 0x70, 0x75, 0x36, 0x1b, 0xb3, 0xff, 0xa8, 0x07, 0xdc, - 0xb7, 0xc5, 0xe4, 0xf8, 0xfb, 0xe7, 0xeb, 0x3a, 0x23, 0xe2, 0x06, 0x85, 0x3a, 0xd2, 0x8c, 0x8b, - 0x56, 0x1a, 0x5f, 0xb1, 0xb1, 0xba, 0xbc, 0x78, 0x81, 0x35, 0xb7, 0x92, 0xd6, 0xa8, 0xe9, 0x77, - 0x14, 0xaa, 0xc4, 0x2d, 0x79, 0x9b, 0x78, 0x39, 0x22, 0x9a, 0x3c, 0xc5, 0xa5, 0xb4, 0xe3, 0x6f, - 0x2f, 0x55, 0xd6, 0x8b, 0x74, 0x4e, 0xab, 0xfe, 0x15, 0xe1, 0xff, 0xe6, 0xb2, 0xc8, 0x36, 0xc6, - 0x94, 0x1b, 0x1d, 0xe8, 0x9a, 0xbe, 0x2d, 0xa2, 0x63, 0xfc, 0xab, 0xe7, 0x29, 0x7f, 0x24, 0x01, - 0xb2, 0x8b, 0x37, 0x66, 0xed, 0xc5, 0xd0, 0x95, 0xa7, 0xda, 0x98, 0x1f, 0xff, 0xa9, 0xea, 0xab, - 0xa1, 0x0b, 0x7a, 0x81, 0xc6, 0xc3, 0x70, 0xe2, 0x99, 0x2f, 0x5c, 0x5f, 0x28, 0xb9, 0x1a, 0x6a, - 0x64, 0xf5, 0x71, 0x44, 0x54, 0x8c, 0x5d, 0xf0, 0x2c, 0x70, 0x04, 0xb5, 0x41, 0xf9, 0x27, 0xe2, - 0x62, 0x48, 0xf8, 0x68, 0x0e, 0x58, 0x07, 0x6c, 0x23, 0x00, 0x8f, 0x53, 0xe6, 0x28, 0x2b, 0x91, - 0xc9, 0xeb, 0x11, 0x78, 0x28, 0xb1, 0xeb, 0x06, 0x2e, 0x24, 0x9a, 0x93, 0x0a, 0x2e, 0xef, 0xd9, - 0x26, 0xe7, 0xb4, 0x4b, 0x2d, 0x53, 0x50, 0xe6, 0x3c, 0x0f, 0xc0, 0xb3, 0x99, 0xd9, 0x29, 0x66, - 0x48, 0x15, 0x6f, 0x25, 0xb9, 0xd7, 0x4e, 0x67, 0x4c, 0x22, 0x72, 0x01, 0x97, 0x9e, 0x99, 0x02, - 0x1c, 0x6b, 0xa8, 0x43, 0xcf, 0x03, 0x1e, 0xca, 0x17, 0xb3, 0xad, 0x76, 0xec, 0x52, 0x5e, 0x82, - 0x17, 0x50, 0x0b, 0xc8, 0x01, 0xce, 0x4f, 0x31, 0xa2, 0x2e, 0x7f, 0x99, 0x2b, 0x97, 0xfe, 0xf2, - 0x27, 0xad, 0x67, 0x76, 0x3f, 0xa2, 0xd3, 0x33, 0x15, 0x7d, 0x3b, 0x53, 0x33, 0x1f, 0x46, 0x2a, - 0x3a, 0x1d, 0xa9, 0xe8, 0xcb, 0x48, 0x45, 0xdf, 0x47, 0x2a, 0x3a, 0xf9, 0xa1, 0x66, 0xde, 0x58, - 0xb1, 0x2f, 0x50, 0xdf, 0x6f, 0xc3, 0xf1, 0x91, 0xe9, 0x75, 0xb5, 0xbe, 0x29, 0x4c, 0x7b, 0xc8, - 0xc5, 0x8e, 0xc5, 0x3c, 0xd0, 0xdc, 0x7e, 0x4f, 0x33, 0x7b, 0xe0, 0x08, 0x8d, 0x0f, 0xb9, 0xd9, - 0x09, 0x28, 0x67, 0x9e, 0xe6, 0xda, 0x7e, 0x8f, 0x3a, 0xda, 0xb4, 0xbf, 0x16, 0x59, 0xc7, 0xb5, - 0x36, 0x3b, 0x86, 0x38, 0xce, 0x03, 0xab, 0xbd, 0x1a, 0x7d, 0xbe, 0x6e, 0xfe, 0x0e, 0x00, 0x00, - 0xff, 0xff, 0x3c, 0x07, 0xdd, 0x46, 0x12, 0x07, 0x00, 0x00, + // 822 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x4f, 0x73, 0xdb, 0x44, + 0x14, 0xcf, 0x2a, 0x24, 0xe0, 0x97, 0x38, 0xb1, 0x37, 0x34, 0x11, 0xce, 0x54, 0x78, 0xcc, 0x14, + 0x3c, 0x9d, 0xa9, 0x35, 0x04, 0x18, 0xfe, 0x5e, 0x68, 0x69, 0x67, 0xca, 0x74, 0x1a, 0x46, 0x85, + 0x1c, 0x38, 0xa0, 0x59, 0xcb, 0xcf, 0xca, 0x62, 0x45, 0x2b, 0x76, 0x57, 0xea, 0x78, 0xb8, 0xc0, + 0x81, 0x13, 0x97, 0x7e, 0x17, 0xbe, 0x44, 0x8f, 0x1c, 0x39, 0xd2, 0x70, 0xe6, 0x2b, 0x30, 0x8c, + 0xb4, 0x8e, 0x25, 0x19, 0x2b, 0xf4, 0xe4, 0xf7, 0x7e, 0xef, 0xbd, 0xdf, 0xdb, 0xf7, 0xdb, 0x67, + 0x2d, 0x1c, 0xf0, 0x78, 0x8a, 0x12, 0xe3, 0x00, 0x7d, 0x95, 0x05, 0xa3, 0x44, 0x0a, 0x2d, 0xe8, + 0xee, 0x12, 0x54, 0x59, 0xd0, 0xbb, 0x13, 0x72, 0x7d, 0x9e, 0x8e, 0x47, 0x81, 0xb8, 0x70, 0x43, + 0x11, 0x0a, 0xb7, 0x48, 0x1a, 0xa7, 0xd3, 0xc2, 0x2b, 0x9c, 0xc2, 0x32, 0xc5, 0x83, 0x5f, 0x2d, + 0xe8, 0x3c, 0xbc, 0xaa, 0xf7, 0xf0, 0x87, 0x14, 0x95, 0xa6, 0x6f, 0x41, 0x7b, 0x8a, 0x4c, 0xa7, + 0x12, 0xfd, 0x98, 0x5d, 0xa0, 0xb2, 0x49, 0x7f, 0x73, 0xd8, 0xf2, 0x76, 0x17, 0xe0, 0xe3, 0x1c, + 0xa3, 0x08, 0x07, 0x89, 0x98, 0xf8, 0xd2, 0xd4, 0xf8, 0x18, 0x6b, 0xc9, 0x51, 0xd9, 0x56, 0x7f, + 0x73, 0xb8, 0x73, 0xf2, 0xc1, 0xa8, 0x7a, 0xa8, 0xd1, 0x6a, 0x87, 0xd1, 0x57, 0x62, 0xb2, 0x30, + 0xef, 0x9b, 0xba, 0xfc, 0x67, 0xee, 0x75, 0x93, 0x55, 0xbc, 0x37, 0x83, 0xc3, 0xf5, 0xc9, 0xb4, + 0x03, 0x9b, 0x33, 0x9c, 0xdb, 0xa4, 0x4f, 0x86, 0x2d, 0x2f, 0x37, 0xe9, 0xa7, 0xb0, 0x95, 0xb1, + 0x28, 0x45, 0xdb, 0xea, 0x93, 0xe1, 0xce, 0xc9, 0xad, 0xfa, 0x21, 0xee, 0x89, 0x58, 0x33, 0x1e, + 0xa3, 0xac, 0x93, 0x79, 0xa6, 0xe6, 0x13, 0xeb, 0x23, 0x32, 0xf8, 0x87, 0xc0, 0x51, 0x43, 0x1a, + 0xfd, 0x11, 0xec, 0xe0, 0x2a, 0xe4, 0x5f, 0xc9, 0x53, 0x94, 0x1a, 0x7d, 0x76, 0x4e, 0x3e, 0x7f, + 0xa9, 0x7e, 0x25, 0xfe, 0xc0, 0x90, 0x9c, 0x15, 0x1c, 0x46, 0x80, 0xc3, 0x60, 0x6d, 0xb0, 0x37, + 0x85, 0xe3, 0x6b, 0xca, 0xd6, 0x48, 0xf1, 0x6e, 0x5d, 0x8a, 0xe3, 0xfa, 0xd1, 0x6a, 0x14, 0x55, + 0x01, 0xde, 0x81, 0x76, 0x2d, 0x46, 0x0f, 0x61, 0xbb, 0x32, 0x63, 0xcb, 0x5b, 0x78, 0x83, 0xbf, + 0x09, 0x74, 0x2b, 0xb7, 0xaa, 0x12, 0x11, 0x2b, 0xa4, 0x1c, 0x5e, 0x37, 0x3b, 0x61, 0xfc, 0xe5, + 0x52, 0x18, 0x7d, 0x3e, 0x6c, 0x5c, 0x0a, 0x93, 0x6e, 0xb6, 0xc2, 0xd8, 0xb5, 0xb5, 0xa0, 0xc9, + 0x7f, 0x02, 0xbd, 0x0b, 0x38, 0x6a, 0x48, 0x5f, 0xa3, 0xc6, 0x67, 0x75, 0x35, 0xde, 0x6e, 0xbc, + 0xa8, 0x1a, 0x5b, 0x55, 0x98, 0x67, 0x16, 0xd8, 0x4d, 0x79, 0xf4, 0x17, 0x02, 0xc7, 0xe5, 0x6e, + 0x94, 0xff, 0x51, 0x89, 0x2a, 0x8d, 0xf4, 0xd5, 0xf8, 0xf7, 0x5f, 0xae, 0x6b, 0x19, 0xa8, 0x0a, + 0x94, 0xf3, 0x18, 0x31, 0xde, 0x08, 0x9a, 0xe2, 0xbd, 0x08, 0x9c, 0xeb, 0x8b, 0xd7, 0x48, 0xf3, + 0x7e, 0x5d, 0x1a, 0xa7, 0xf9, 0x8e, 0x72, 0x96, 0xaa, 0x24, 0xdf, 0xd5, 0xbe, 0x1c, 0x45, 0x98, + 0x7e, 0x09, 0xdd, 0xa6, 0xf1, 0x6f, 0x5e, 0xcb, 0xec, 0x75, 0xf8, 0x0a, 0xd7, 0xe0, 0x37, 0x0b, + 0xf6, 0x57, 0xb2, 0xe8, 0x4d, 0x00, 0xae, 0xfc, 0x09, 0x4e, 0x59, 0x1a, 0xe9, 0x62, 0x8c, 0xd7, + 0xbc, 0x16, 0x57, 0x5f, 0x18, 0x80, 0xde, 0x85, 0xbd, 0xb2, 0xbd, 0x9e, 0x27, 0x66, 0xaa, 0xbd, + 0xd5, 0xf5, 0x5f, 0xb2, 0x7e, 0x3d, 0x4f, 0xd0, 0x6b, 0xf3, 0xaa, 0x9b, 0x6f, 0xbc, 0x48, 0x75, + 0x92, 0x6a, 0x7b, 0xb3, 0x4f, 0x86, 0x96, 0xb7, 0xf0, 0xa8, 0x03, 0x90, 0xa0, 0x0c, 0x30, 0xd6, + 0x3c, 0x42, 0xfb, 0x95, 0x22, 0x56, 0x41, 0xf2, 0x8f, 0xe6, 0x85, 0x98, 0x60, 0xe4, 0x67, 0x28, + 0x15, 0x17, 0xb1, 0xbd, 0x55, 0x88, 0xbc, 0x5b, 0x80, 0x67, 0x06, 0xa3, 0xb7, 0x60, 0x2f, 0xc4, + 0x18, 0x25, 0x0f, 0xfc, 0x45, 0x93, 0xed, 0x22, 0xab, 0xbd, 0x40, 0x4f, 0x4d, 0xaf, 0x8f, 0x61, + 0xc7, 0x88, 0xe7, 0x4f, 0x23, 0x16, 0xda, 0xaf, 0x16, 0x43, 0xd8, 0xf5, 0x21, 0x8c, 0x22, 0x0f, + 0x22, 0x16, 0x7a, 0x20, 0x97, 0xf6, 0xed, 0xef, 0xa1, 0x5d, 0x1b, 0x8f, 0xf6, 0xe0, 0xf0, 0x5e, + 0xc4, 0x94, 0xe2, 0x53, 0x1e, 0x30, 0xcd, 0x45, 0x7c, 0x9a, 0xa1, 0x8c, 0x04, 0x9b, 0x74, 0x36, + 0xe8, 0x31, 0x1c, 0xd5, 0x63, 0xdf, 0xc4, 0x93, 0x45, 0x90, 0xd0, 0x1b, 0xd0, 0x7d, 0xc4, 0x34, + 0xc6, 0xc1, 0xdc, 0xc3, 0x50, 0xa2, 0xca, 0x07, 0xe8, 0x58, 0xb4, 0x05, 0x5b, 0xa7, 0xfa, 0x1c, + 0x65, 0x67, 0xff, 0xf6, 0x23, 0x80, 0xf2, 0x14, 0xf4, 0x00, 0xf6, 0x4b, 0xef, 0x8c, 0x45, 0x3c, + 0xef, 0x70, 0x03, 0xba, 0x25, 0xf8, 0x30, 0xce, 0x0a, 0x98, 0x50, 0x0a, 0x7b, 0x25, 0xfc, 0x64, + 0xc6, 0x93, 0x8e, 0x75, 0x32, 0xae, 0xec, 0xd3, 0x13, 0x94, 0x19, 0x0f, 0x90, 0x3e, 0x86, 0xd6, + 0x12, 0xa3, 0xce, 0xf5, 0x8f, 0x4a, 0xef, 0xcd, 0xff, 0xf9, 0xbe, 0x0c, 0x36, 0xee, 0xfe, 0x4c, + 0x9e, 0xbf, 0x70, 0xc8, 0x1f, 0x2f, 0x9c, 0x8d, 0x9f, 0x2e, 0x1d, 0xf2, 0xfc, 0xd2, 0x21, 0xbf, + 0x5f, 0x3a, 0xe4, 0xcf, 0x4b, 0x87, 0x3c, 0xfb, 0xcb, 0xd9, 0xf8, 0x36, 0xa8, 0x3c, 0x9e, 0xb3, + 0x74, 0x8c, 0x4f, 0xcf, 0x99, 0x9c, 0xba, 0x33, 0xa6, 0x59, 0x34, 0x57, 0xfa, 0x4e, 0x20, 0x24, + 0xba, 0xc9, 0x2c, 0x74, 0x59, 0x88, 0xb1, 0x76, 0xd5, 0x5c, 0xb1, 0x49, 0xc6, 0x95, 0x90, 0x6e, + 0x12, 0xa5, 0x21, 0x8f, 0xdd, 0x65, 0x7f, 0xb7, 0xb8, 0x75, 0xe5, 0x8e, 0xc5, 0x53, 0xac, 0xe2, + 0x2a, 0x0b, 0xc6, 0xdb, 0xc5, 0xcb, 0xfb, 0xde, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xdd, 0x97, + 0xe8, 0x67, 0xcd, 0x07, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -876,6 +935,18 @@ func (m *InferenceResult) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.ResultFlag != 0 { + i = encodeVarintInferenceSvc(dAtA, i, uint64(m.ResultFlag)) + i-- + dAtA[i] = 0x38 + } + if len(m.GenericOutput) > 0 { + i -= len(m.GenericOutput) + copy(dAtA[i:], m.GenericOutput) + i = encodeVarintInferenceSvc(dAtA, i, uint64(len(m.GenericOutput))) + i-- + dAtA[i] = 0x32 + } if len(m.ModelVersion) > 0 { i -= len(m.ModelVersion) copy(dAtA[i:], m.ModelVersion) @@ -1070,6 +1141,13 @@ func (m *InferenceResult) Size() (n int) { if l > 0 { n += 1 + l + sovInferenceSvc(uint64(l)) } + l = len(m.GenericOutput) + if l > 0 { + n += 1 + l + sovInferenceSvc(uint64(l)) + } + if m.ResultFlag != 0 { + n += 1 + sovInferenceSvc(uint64(m.ResultFlag)) + } return n } @@ -1195,6 +1273,8 @@ func (this *InferenceResult) String() string { `Output:` + fmt.Sprintf("%v", this.Output) + `,`, `Percentile:` + fmt.Sprintf("%v", this.Percentile) + `,`, `ModelVersion:` + fmt.Sprintf("%v", this.ModelVersion) + `,`, + `GenericOutput:` + fmt.Sprintf("%v", this.GenericOutput) + `,`, + `ResultFlag:` + fmt.Sprintf("%v", this.ResultFlag) + `,`, `}`, }, "") return s @@ -2261,6 +2341,57 @@ func (m *InferenceResult) Unmarshal(dAtA []byte) error { } m.ModelVersion = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field GenericOutput", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInferenceSvc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthInferenceSvc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthInferenceSvc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.GenericOutput = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ResultFlag", wireType) + } + m.ResultFlag = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInferenceSvc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ResultFlag |= ResultFlag(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipInferenceSvc(dAtA[iNdEx:]) diff --git a/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc/inference_svc.proto b/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc/inference_svc.proto index 6b312f121..8495574c3 100644 --- a/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc/inference_svc.proto +++ b/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc/inference_svc.proto @@ -59,6 +59,13 @@ enum InferenceType { ClassificationOverload = 0; ClassificationUnderload = 1; LatencyRegression = 2; + Other = 15; +} + +enum ResultFlag { + ResultFlagValid = 0; + ResultFlagInvalid = 1; + ResultFlagSkip = 2; } message InferenceResult { @@ -74,6 +81,13 @@ message InferenceResult { float percentile = 4; // model_version to identify where does the output come from. string model_version = 5; + // generic_output provides specific inference result from corresponding model, + // it will be decoded by the specific client accesses the model. + string generic_output = 6; + // since every entry in request should have a corresponding entry in resp, + // but not all entries need to be inferred. + // so we use result_flag to identify if this entry in resp should be handled. + ResultFlag result_flag= 7; } service InferenceService { diff --git a/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils/utils.go b/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils/utils.go new file mode 100644 index 000000000..6bf12d0e1 --- /dev/null +++ b/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils/utils.go @@ -0,0 +1,32 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "fmt" + + borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts" +) + +func GetInferenceResultKey(modelName string) string { + // legacy model name compatible + if modelName == borweinconsts.ModelNameBorwein { + return modelName + } + + return fmt.Sprintf("%s/%s", borweinconsts.ModelNameBorwein, modelName) +} diff --git a/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node.go b/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node.go index ec3cafacf..0be820704 100644 --- a/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node.go +++ b/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node.go @@ -43,7 +43,8 @@ import ( // nodeRawMetricNameMapping maps the raw metricName (collected from agent.MetricsFetcher) // to the standard metricName (used by custom-metric-api-server) var nodeRawMetricNameMapping = map[string]string{ - consts.MetricLoad1MinSystem: apimetricnode.CustomMetricNodeCPULoad1Min, + consts.MetricLoad1MinSystem: apimetricnode.CustomMetricNodeCPULoad1Min, + consts.MetricCPUUsageRatioSystem: apimetricnode.CustomMetricNodeCPUUsageRatio, consts.MetricMemFreeSystem: apimetricnode.CustomMetricNodeMemoryFree, consts.MetricMemAvailableSystem: apimetricnode.CustomMetricNodeMemoryAvailable, diff --git a/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/model.go b/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/model.go index d0ff79d4d..31d56bc6b 100644 --- a/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/model.go +++ b/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/model.go @@ -22,9 +22,11 @@ import ( "k8s.io/klog/v2" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/trainingtpreg" borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts" borweininfsvc "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc" borweintypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/types" + borweinutils "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils" "github.com/kubewharf/katalyst-core/pkg/custom-metric/store/data" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/general" @@ -37,7 +39,7 @@ var modelOutputToEmit []string = []string{ // modelMetric emit pod_level model inference to kcmas. func (p *MetricSyncerPod) modelMetric() { for _, modelName := range modelOutputToEmit { - results, err := p.metaReader.GetInferenceResult(modelName) + results, err := p.metaReader.GetInferenceResult(borweinutils.GetInferenceResultKey(modelName)) if err != nil { klog.Errorf("failed to get inference results of model(%s)", modelName) continue @@ -92,5 +94,42 @@ func (p *MetricSyncerPod) modelMetric() { } } + for modelName, customizedEmitterFunc := range p.modelToCustomizedEmitterFunc { + general.Infof("calling customized emitter func for model: %s", modelName) + customizedEmitterFunc() + } + general.InfofV(4, "get model metric for pod") } + +func (p *MetricSyncerPod) emitBorweinTrainingThroughput() { + trainingThroughputData, resultTimestamp, err := trainingtpreg.GetTrainingTHRegPredictValue(p.metaReader) + if err != nil { + klog.Errorf("failed to get inference results of model(%s)", borweinconsts.ModelNameBorweinTrainingThroughput) + } + + for podUID, containerData := range trainingThroughputData { + pod, err := p.metaServer.GetPod(context.Background(), podUID) + if err != nil || !p.metricPod(pod) { + return + } + + tags := p.generateMetricTag(pod) + + for containerName, trainingThroughput := range containerData { + _ = p.dataEmitter.StoreFloat64(podTrainingThroughputInferenceResultBorwein, + trainingThroughput, + metrics.MetricTypeNameRaw, + append(tags, + metrics.MetricTag{ + Key: fmt.Sprintf("%s", data.CustomMetricLabelKeyTimestamp), + Val: fmt.Sprintf("%v", resultTimestamp), + }, + metrics.MetricTag{ + Key: fmt.Sprintf("%scontainer", data.CustomMetricLabelSelectorPrefixKey), + Val: containerName, + }, + )...) + } + } +} diff --git a/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod.go b/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod.go index 334ce9762..076199745 100644 --- a/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod.go +++ b/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod.go @@ -27,6 +27,7 @@ import ( apimetricpod "github.com/kubewharf/katalyst-api/pkg/metric/pod" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" + borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter/syncer" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter/types" "github.com/kubewharf/katalyst-core/pkg/config" @@ -45,15 +46,17 @@ import ( const ( podMetricLabelSelectorNodeName = "node_name" - podModelInferenceResultBorwein = "pod_borwein_inference_result" + podModelInferenceResultBorwein = "pod_borwein_inference_result" + podTrainingThroughputInferenceResultBorwein = "pod_borwein_training_throughput_inference_result" ) // podRawMetricNameMapping maps the raw metricName (collected from agent.MetricsFetcher) // to the standard metricName (used by custom-metric-api-server) var podRawMetricNameMapping = map[string]string{ - consts.MetricLoad1MinContainer: apimetricpod.CustomMetricPodCPULoad1Min, - consts.MetricCPUUsageContainer: apimetricpod.CustomMetricPodCPUUsage, - consts.MetricCPUCPIContainer: apimetricpod.CustomMetricPodCPUCPI, + consts.MetricLoad1MinContainer: apimetricpod.CustomMetricPodCPULoad1Min, + consts.MetricCPUUsageContainer: apimetricpod.CustomMetricPodCPUUsage, + consts.MetricCPUUsageRatioContainer: apimetricpod.CustomMetricPodCPUUsageRatio, + consts.MetricCPUCPIContainer: apimetricpod.CustomMetricPodCPUCPI, consts.MetricMemRssContainer: apimetricpod.CustomMetricPodMemoryRSS, consts.MetricMemUsageContainer: apimetricpod.CustomMetricPodMemoryUsage, @@ -82,6 +85,8 @@ type MetricSyncerPod struct { metaServer *metaserver.MetaServer metaReader metacache.MetaReader + + modelToCustomizedEmitterFunc map[string]func() } func NewMetricSyncerPod(conf *config.Configuration, _ interface{}, @@ -99,7 +104,7 @@ func NewMetricSyncerPod(conf *config.Configuration, _ interface{}, metricMapping := general.MergeMap(podRawMetricNameMapping, conf.MetricEmitterPodConfiguration.MetricMapping) - return &MetricSyncerPod{ + metricSyncerPod := &MetricSyncerPod{ metricMapping: metricMapping, emitterConf: conf.AgentConfiguration.MetricEmitterPluginConfiguration, @@ -110,7 +115,13 @@ func NewMetricSyncerPod(conf *config.Configuration, _ interface{}, dataEmitter: dataEmitter, metaServer: metaServer, metaReader: metaReader, - }, nil + } + + metricSyncerPod.modelToCustomizedEmitterFunc = map[string]func(){ + borweinconsts.ModelNameBorweinTrainingThroughput: metricSyncerPod.emitBorweinTrainingThroughput, + } + + return metricSyncerPod, nil } func (p *MetricSyncerPod) Name() string { diff --git a/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod_test.go b/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod_test.go index f8b93447f..180f77db5 100644 --- a/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod_test.go +++ b/pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod_test.go @@ -18,6 +18,7 @@ package pod import ( "context" + "encoding/json" "testing" "time" @@ -28,6 +29,11 @@ import ( "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher/borwein/trainingtpreg" + borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts" + borweininfsvc "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc" + borweintypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/types" + borweinutils "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" @@ -103,7 +109,25 @@ func TestPodAddAndRemoved(t *testing.T) { MetricsFetcher: metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}), }, } - si, err := NewMetricSyncerPod(conf, struct{}{}, metrics.DummyMetrics{}, metricspool.DummyMetricsEmitterPool{}, meta, &metacache.MetaCacheImp{}) + timeNow := time.Now().Unix() + res := &trainingtpreg.TrainingThroughputRegression{ + PredictValue: 0.5, + } + bs, _ := json.Marshal(res) + reader := metacache.NewDummyMetaCacheImp() + reader.SetInferenceResult(borweinutils.GetInferenceResultKey(borweinconsts.ModelNameBorweinTrainingThroughput), &borweintypes.BorweinInferenceResults{ + Timestamp: timeNow, + Results: map[string]map[string][]*borweininfsvc.InferenceResult{ + "000002": { + "c-1": []*borweininfsvc.InferenceResult{ + { + GenericOutput: string(bs), + }, + }, + }, + }, + }) + si, err := NewMetricSyncerPod(conf, struct{}{}, metrics.DummyMetrics{}, metricspool.DummyMetricsEmitterPool{}, meta, reader) assert.NoError(t, err) s := si.(*MetricSyncerPod) @@ -111,6 +135,7 @@ func TestPodAddAndRemoved(t *testing.T) { t.Logf("run with non-empty pod fetcher") s.syncChanel() + s.modelMetric() assert.Equal(t, len(s.rawNotifier), 1) metaEmpty := &metaserver.MetaServer{ @@ -125,5 +150,6 @@ func TestPodAddAndRemoved(t *testing.T) { t.Logf("reset pod fecther with empty") s.syncChanel() + s.modelMetric() assert.Equal(t, len(s.rawNotifier), 0) } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/modelctrl/borwein/borwein.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/modelctrl/borwein/borwein.go index 127ca4d23..7a47bfa80 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/modelctrl/borwein/borwein.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/modelctrl/borwein/borwein.go @@ -29,6 +29,7 @@ import ( borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts" borweininfsvc "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc" borweintypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/types" + borweinutils "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/metrics" @@ -115,7 +116,7 @@ func updateCPUSchedWaitIndicatorOffset(podSet types.PodSet, currentIndicatorOffs } return filteredResults, nil - }, borweinconsts.ModelNameBorwein) + }, borweinutils.GetInferenceResultKey(borweinconsts.ModelNameBorwein)) if err != nil { return 0, fmt.Errorf("GetFilteredInferenceResult failed with error: %v", err) } diff --git a/pkg/config/agent/sysadvisor/qosaware/model/borwein/borwein.go b/pkg/config/agent/sysadvisor/qosaware/model/borwein/borwein.go index aa9b872af..951d8fd0a 100644 --- a/pkg/config/agent/sysadvisor/qosaware/model/borwein/borwein.go +++ b/pkg/config/agent/sysadvisor/qosaware/model/borwein/borwein.go @@ -22,10 +22,11 @@ import ( ) type BorweinConfiguration struct { - BorweinParameters map[string]*borweintypes.BorweinParameter - NodeFeatureNames []string - ContainerFeatureNames []string - InferenceServiceSocketAbsPath string + BorweinParameters map[string]*borweintypes.BorweinParameter + NodeFeatureNames []string + ContainerFeatureNames []string + InferenceServiceSocketAbsPath string + ModelNameToInferenceSvcSockAbsPath map[string]string } func NewBorweinConfiguration() *BorweinConfiguration { diff --git a/pkg/consts/metric.go b/pkg/consts/metric.go index a9a569585..ebd4a1e58 100644 --- a/pkg/consts/metric.go +++ b/pkg/consts/metric.go @@ -30,20 +30,30 @@ const ( // System compute metrics const ( + MetricCPUUsageRatioSystem = "cpu.usage.ratio.system" + MetricLoad1MinSystem = "cpu.load.1min.system" MetricLoad5MinSystem = "cpu.load.5min.system" MetricLoad15MinSystem = "cpu.load.15min.system" + + MetricProcsRunningSystem = "procs.running.system" + + MetricCPIAvgSystem = "cpi.avg.system" ) // System memory metrics const ( - MetricMemTotalSystem = "mem.total.system" - MetricMemUsedSystem = "mem.used.system" - MetricMemFreeSystem = "mem.free.system" - MetricMemShmemSystem = "mem.shmem.system" - MetricMemBufferSystem = "mem.buffer.system" - MetricMemPageCacheSystem = "mem.pagecache.system" - MetricMemAvailableSystem = "mem.available.system" + MetricMemTotalSystem = "mem.total.system" + MetricMemUsedSystem = "mem.used.system" + MetricMemFreeSystem = "mem.free.system" + MetricMemShmemSystem = "mem.shmem.system" + MetricMemBufferSystem = "mem.buffer.system" + MetricMemPageCacheSystem = "mem.pagecache.system" + MetricMemAvailableSystem = "mem.available.system" + MetricMemActiveAnonSystem = "mem.active.anon.system" + MetricMemInactiveAnonSystem = "mem.inactive.anon.system" + MetricMemActiveFileSystem = "mem.active.file.system" + MetricMemInactiveFileSystem = "mem.inactive.file.system" MetricMemDirtySystem = "mem.dirty.system" MetricMemWritebackSystem = "mem.writeback.system" @@ -57,6 +67,11 @@ const ( MetricMemScaleFactorSystem = "mem.scale.factor.system" MetricMemUpdateTimeSystem = "mem.updatetime.system" + + MetricMemSockTCPSystem = "mem.sock.tcp.system" + MetricMemSockTCPLimitSystem = "mem.sock.tcp_limit.system" + MetricMemSockUDPSystem = "mem.sock.udp.system" + MetricMemSockUDPLimitSystem = "mem.sock.udp_limit.system" ) // System blkio metrics @@ -149,17 +164,19 @@ const ( // System cpu compute metrics const ( - MetricCPUSchedwait = "cpu.schedwait.cpu" - MetricCPUUsageRatio = "cpu.usage.ratio.cpu" - MetricCPUIOWaitRatio = "cpu.iowait.ratio.cpu" + MetricCPUSchedwait = "cpu.schedwait.cpu" + MetricCPUUsageRatio = "cpu.usage.ratio.cpu" + MetricCPUSysUsageRatio = "cpu.sys.usage.ratio.cpu" + MetricCPUIOWaitRatio = "cpu.iowait.ratio.cpu" ) // container cpu metrics const ( - MetricCPULimitContainer = "cpu.limit.container" - MetricCPUUsageContainer = "cpu.usage.container" - MetricCPUUsageUserContainer = "cpu.usage.user.container" - MetricCPUUsageSysContainer = "cpu.usage.sys.container" + MetricCPULimitContainer = "cpu.limit.container" + MetricCPUUsageContainer = "cpu.usage.container" + MetricCPUUsageUserContainer = "cpu.usage.user.container" + MetricCPUUsageSysContainer = "cpu.usage.sys.container" + MetricCPUUsageRatioContainer = "cpu.usage.ratio.container" MetricCPUShareContainer = "cpu.share.container" MetricCPUQuotaContainer = "cpu.quota.container" diff --git a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go index ed6d59ab3..ae54a8c0c 100644 --- a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go +++ b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go @@ -234,6 +234,10 @@ func (m *MalachiteMetricsProvisioner) processSystemComputeData(systemComputeData // todo, currently we only get a unified data for the whole system compute data updateTime := time.Unix(systemComputeData.UpdateTime, 0) + globalCPU := systemComputeData.GlobalCPU + m.metricStore.SetNodeMetric(consts.MetricCPUUsageRatioSystem, + utilmetric.MetricData{Value: globalCPU.CPUUsage / 100, Time: &updateTime}) + load := systemComputeData.Load m.metricStore.SetNodeMetric(consts.MetricLoad1MinSystem, utilmetric.MetricData{Value: load.One, Time: &updateTime}) @@ -241,6 +245,10 @@ func (m *MalachiteMetricsProvisioner) processSystemComputeData(systemComputeData utilmetric.MetricData{Value: load.Five, Time: &updateTime}) m.metricStore.SetNodeMetric(consts.MetricLoad15MinSystem, utilmetric.MetricData{Value: load.Fifteen, Time: &updateTime}) + + procsRunning := systemComputeData.ProcessStats.ProcessRunning + m.metricStore.SetNodeMetric(consts.MetricProcsRunningSystem, + utilmetric.MetricData{Value: float64(procsRunning), Time: &updateTime}) } func (m *MalachiteMetricsProvisioner) processSystemMemoryData(systemMemoryData *malachitetypes.SystemMemoryData) { @@ -274,6 +282,15 @@ func (m *MalachiteMetricsProvisioner) processSystemMemoryData(systemMemoryData * m.metricStore.SetNodeMetric(consts.MetricMemAvailableSystem, utilmetric.MetricData{Value: float64(mem.MemAvailable << 10), Time: &updateTime}) + m.metricStore.SetNodeMetric(consts.MetricMemActiveAnonSystem, + utilmetric.MetricData{Value: float64(mem.MemActiveAnon << 10), Time: &updateTime}) + m.metricStore.SetNodeMetric(consts.MetricMemInactiveAnonSystem, + utilmetric.MetricData{Value: float64(mem.MemInactiveAnon << 10), Time: &updateTime}) + m.metricStore.SetNodeMetric(consts.MetricMemActiveFileSystem, + utilmetric.MetricData{Value: float64(mem.MemActiveFile << 10), Time: &updateTime}) + m.metricStore.SetNodeMetric(consts.MetricMemInactiveFileSystem, + utilmetric.MetricData{Value: float64(mem.MemInactiveFile << 10), Time: &updateTime}) + m.metricStore.SetNodeMetric(consts.MetricMemDirtySystem, utilmetric.MetricData{Value: float64(mem.MemDirtyPageCache << 10), Time: &updateTime}) m.metricStore.SetNodeMetric(consts.MetricMemWritebackSystem, @@ -291,6 +308,15 @@ func (m *MalachiteMetricsProvisioner) processSystemMemoryData(systemMemoryData * m.metricStore.SetNodeMetric(consts.MetricMemScaleFactorSystem, utilmetric.MetricData{Value: float64(mem.VMWatermarkScaleFactor), Time: &updateTime}) + m.metricStore.SetNodeMetric(consts.MetricMemSockTCPSystem, + utilmetric.MetricData{Value: float64(mem.MemSockTcp), Time: &updateTime}) + m.metricStore.SetNodeMetric(consts.MetricMemSockTCPLimitSystem, + utilmetric.MetricData{Value: float64(mem.MemSockTcpLimit), Time: &updateTime}) + m.metricStore.SetNodeMetric(consts.MetricMemSockUDPSystem, + utilmetric.MetricData{Value: float64(mem.MemSockUdp), Time: &updateTime}) + m.metricStore.SetNodeMetric(consts.MetricMemSockUDPLimitSystem, + utilmetric.MetricData{Value: float64(mem.MemSockUdpLimit), Time: &updateTime}) + // timestamp m.metricStore.SetNodeMetric(consts.MetricMemUpdateTimeSystem, utilmetric.MetricData{Value: float64(systemMemoryData.UpdateTime), Time: &updateTime}) @@ -472,6 +498,7 @@ func (m *MalachiteMetricsProvisioner) processSystemCPUComputeData(systemComputeD // todo, currently we only get a unified data for the whole system compute data updateTime := time.Unix(systemComputeData.UpdateTime, 0) + var cpiTotal, cpiCount float64 for _, cpu := range systemComputeData.CPU { cpuID, err := strconv.Atoi(cpu.Name[3:]) if err != nil { @@ -487,9 +514,21 @@ func (m *MalachiteMetricsProvisioner) processSystemCPUComputeData(systemComputeD utilmetric.MetricData{Value: cpu.CPUSchedWait * 1000, Time: &updateTime}) m.metricStore.SetCPUMetric(cpuID, consts.MetricCPUIOWaitRatio, utilmetric.MetricData{Value: cpu.CPUIowaitRatio, Time: &updateTime}) + + if cpu.CpiData != nil { + cpiTotal += cpu.CpiData.Cpi + cpiCount += 1 + } } m.metricStore.SetNodeMetric(consts.MetricCPUUsageRatio, utilmetric.MetricData{Value: systemComputeData.GlobalCPU.CPUUsage / 100.0, Time: &updateTime}) + m.metricStore.SetNodeMetric(consts.MetricCPUSysUsageRatio, + utilmetric.MetricData{Value: systemComputeData.GlobalCPU.CPUSysUsage / 100.0, Time: &updateTime}) + + if cpiCount > 0 { + m.metricStore.SetNodeMetric(consts.MetricCPIAvgSystem, + utilmetric.MetricData{Value: cpiTotal / cpiCount, Time: &updateTime}) + } } func (m *MalachiteMetricsProvisioner) processCgroupCPUData(cgroupPath string, cgStats *malachitetypes.MalachiteCgroupInfo) { @@ -707,6 +746,8 @@ func (m *MalachiteMetricsProvisioner) processContainerCPUData(podUID, containerN utilmetric.MetricData{Value: cpu.CPUUserUsageRatio, Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageSysContainer, utilmetric.MetricData{Value: cpu.CPUSysUsageRatio, Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageRatioContainer, + utilmetric.MetricData{Value: cpu.CPUUsageRatio / (float64(cpu.CfsQuotaUs) / float64(cpu.CfsPeriodUs)), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUShareContainer, utilmetric.MetricData{Value: float64(cpu.CPUShares), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUQuotaContainer, @@ -776,6 +817,8 @@ func (m *MalachiteMetricsProvisioner) processContainerCPUData(podUID, containerN utilmetric.MetricData{Value: cpu.CPUUserUsageRatio, Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageSysContainer, utilmetric.MetricData{Value: cpu.CPUSysUsageRatio, Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageRatioContainer, + utilmetric.MetricData{Value: cpu.CPUUsageRatio / (float64(cpu.Max) / float64(cpu.MaxPeriod)), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrRunnableContainer, utilmetric.MetricData{Value: float64(cpu.TaskNrRunning), Time: &updateTime}) diff --git a/pkg/metaserver/agent/metric/provisioner/malachite/types/system.go b/pkg/metaserver/agent/metric/provisioner/malachite/types/system.go index c40be0abe..cad02f0de 100644 --- a/pkg/metaserver/agent/metric/provisioner/malachite/types/system.go +++ b/pkg/metaserver/agent/metric/provisioner/malachite/types/system.go @@ -88,10 +88,16 @@ type MalachiteSystemComputeResponse struct { } type SystemComputeData struct { - Load Load `json:"load"` - CPU []CPU `json:"cpu"` - GlobalCPU CPU `json:"global_cpu"` - UpdateTime int64 `json:"update_time"` + Load Load `json:"load"` + CPU []CPU `json:"cpu"` + GlobalCPU CPU `json:"global_cpu"` + ProcessStats ProcessStats `json:"process_stats"` + UpdateTime int64 `json:"update_time"` +} + +type ProcessStats struct { + ProcessRunning uint64 `json:"procs_running"` + ProcessBlocked uint64 `json:"procs_blocked"` } type Load struct { @@ -103,6 +109,7 @@ type Load struct { type CPU struct { Name string `json:"name"` CPUUsage float64 `json:"cpu_usage"` + CPUSysUsage float64 `json:"cpu_sys_usage"` CPUIowaitRatio float64 `json:"cpu_iowait_ratio"` CPUSchedWait float64 `json:"cpu_sched_wait"` CpiData *CpiData `json:"cpi_data"` @@ -141,8 +148,16 @@ type System struct { MemSwapTotal uint64 `json:"mem_swap_total"` MemSwapFree uint64 `json:"mem_swap_free"` MemUtil float64 `json:"mem_util"` + MemActiveAnon uint64 `json:"mem_active_anon"` + MemInactiveAnon uint64 `json:"mem_inactive_anon"` + MemActiveFile uint64 `json:"mem_active_file"` + MemInactiveFile uint64 `json:"mem_inactive_file"` VMWatermarkScaleFactor uint64 `json:"vm_watermark_scale_factor"` VmstatPgstealKswapd uint64 `json:"vmstat_pgsteal_kswapd"` + MemSockTcp uint64 `json:"mem_sock_tcp"` + MemSockUdp uint64 `json:"mem_sock_udp"` + MemSockTcpLimit uint64 `json:"mem_sock_tcp_limit"` + MemSockUdpLimit uint64 `json:"mem_sock_udp_limit"` } type CPUList struct {