Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi-models in borwein #701

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,27 @@ import (
)

type BorweinOptions struct {
InferenceServiceSocketAbsPath string
FeatureDescriptionFilePath string
NodeFeatureNames []string
ContainerFeatureNames []string
InferenceServiceSocketAbsPath string
ModelNameToInferenceSvcSockAbsPath map[string]string
FeatureDescriptionFilePath string
NodeFeatureNames []string
ContainerFeatureNames []string
}

func NewBorweinOptions() *BorweinOptions {
return &BorweinOptions{
NodeFeatureNames: []string{},
ContainerFeatureNames: []string{},
ModelNameToInferenceSvcSockAbsPath: map[string]string{},
NodeFeatureNames: []string{},
ContainerFeatureNames: []string{},
}
}

// AddFlags adds flags to the specified FlagSet.
func (o *BorweinOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.InferenceServiceSocketAbsPath, "borwein-inference-svc-socket-path", o.InferenceServiceSocketAbsPath,
"socket path which borwein inference server listens at")
"socket path which borwein inference server listens at. it's deprecated, use borwein-inference-model-to-svc-socket-path intead")
fs.StringToStringVar(&o.ModelNameToInferenceSvcSockAbsPath, "borwein-inference-model-to-svc-socket-path", o.ModelNameToInferenceSvcSockAbsPath,
"model name to socket path which its borwein inference server listens at")
fs.StringVar(&o.FeatureDescriptionFilePath, "feature-description-filepath", o.FeatureDescriptionFilePath,
"file path to feature descriptions, the option has lower priority to borwein-node-feature-names and borwein-container-feature-names")
fs.StringSliceVar(&o.NodeFeatureNames, "borwein-node-feature-names", o.NodeFeatureNames,
Expand All @@ -60,6 +64,8 @@ func (o *BorweinOptions) ApplyTo(c *borwein.BorweinConfiguration) error {
}{}

c.InferenceServiceSocketAbsPath = o.InferenceServiceSocketAbsPath
c.ModelNameToInferenceSvcSockAbsPath = o.ModelNameToInferenceSvcSockAbsPath

if len(o.NodeFeatureNames)+len(o.ContainerFeatureNames) > 0 {
c.NodeFeatureNames = o.NodeFeatureNames
c.ContainerFeatureNames = o.ContainerFeatureNames
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/google/uuid v1.3.0
github.com/h2non/gock v1.2.0
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.5.1-0.20240911051124-d5c09c115a19
github.com/kubewharf/katalyst-api v0.5.1-0.20240929080014-ae613a8935d8
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
github.com/opencontainers/selinux v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.5.1-0.20240911051124-d5c09c115a19 h1:5kz2u+tLNzW94ByL9yfsfkvE4N8n/r6C1TKDiFWK8rw=
github.com/kubewharf/katalyst-api v0.5.1-0.20240911051124-d5c09c115a19/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/katalyst-api v0.5.1-0.20240929080014-ae613a8935d8 h1:rIwZVD4iia7pTiB6h1xR8muc4jx4GgHWzhVCPKlEhXw=
github.com/kubewharf/katalyst-api v0.5.1-0.20240929080014-ae613a8935d8/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9 h1:jOTYZt7h/J7I8xQMKMUcJjKf5UFBv37jHWvNp5VRFGc=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
33 changes: 33 additions & 0 deletions pkg/agent/sysadvisor/metacache/metacache_dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metacache

import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

func NewDummyMetaCacheImp() *MetaCacheImp {
return &MetaCacheImp{
podEntries: make(types.PodEntries),
poolEntries: make(types.PoolEntries),
regionEntries: make(types.RegionEntries),
modelToResult: make(map[string]interface{}),
containerCreateTimestamp: make(map[string]int64),
emitter: metrics.DummyMetrics{},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ import (
"sync"
"time"

"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

//nolint
"github.com/golang/protobuf/proto"

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/modelresultfetcher"
borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts"
borweininfsvc "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/inferencesvc"
borweintypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/types"
borweinutils "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/utils"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
Expand All @@ -60,14 +62,16 @@ type BorweinModelResultFetcher struct {
name string
qosConfig *generic.QoSConfiguration

nodeFeatureNames []string // handled by GetNodeFeature
containerFeatureNames []string // handled by GetContainerFeature
inferenceServiceSocketAbsPath string
nodeFeatureNames []string // handled by GetNodeFeature
containerFeatureNames []string // handled by GetContainerFeature
inferenceServiceSocketAbsPath string
modelNameToInferenceSvcSockAbsPath map[string]string // map modelName to inference server sock path

emitter metrics.MetricEmitter

infSvcClient borweininfsvc.InferenceServiceClient
clientLock sync.RWMutex
infSvcClient borweininfsvc.InferenceServiceClient
modelNameToInferenceSvcClient map[string]borweininfsvc.InferenceServiceClient // map modelName to its inference client
clientLock sync.RWMutex
}

const (
Expand Down Expand Up @@ -133,7 +137,7 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met
metaWriter metacache.MetaWriter, metaServer *metaserver.MetaServer,
) error {
bmrf.clientLock.RLock()
if bmrf.infSvcClient == nil {
if bmrf.infSvcClient == nil && len(bmrf.modelNameToInferenceSvcClient) == 0 {
bmrf.clientLock.RUnlock()
return fmt.Errorf("infSvcClient isn't initialized")
}
Expand All @@ -152,9 +156,9 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met
return
}

if containerInfo.QoSLevel == apiconsts.PodAnnotationQoSLevelSharedCores || containerInfo.IsDedicatedNumaExclusive() {
requestContainers = append(requestContainers, containerInfo.Clone())
}
// try to inference for main containers of all QoS levels,
// and filter results when parsing resp
requestContainers = append(requestContainers, containerInfo.Clone())

return
})
Expand All @@ -171,27 +175,56 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met
}

bmrf.clientLock.RLock()
resp, err := bmrf.infSvcClient.Inference(ctx, req)
var infSvcClients map[string]borweininfsvc.InferenceServiceClient
if len(bmrf.modelNameToInferenceSvcClient) > 0 {
infSvcClients = bmrf.modelNameToInferenceSvcClient
} else {
infSvcClients = map[string]borweininfsvc.InferenceServiceClient{
borweinconsts.ModelNameBorwein: bmrf.infSvcClient,
}
}
bmrf.clientLock.RUnlock()

if err != nil {
_ = bmrf.emitter.StoreInt64(metricInferenceFailed, 1, metrics.MetricTypeNameRaw)
return fmt.Errorf("Inference failed with error: %v", err)
}
errCh := make(chan error, len(infSvcClients))
for modelName, client := range infSvcClients {
go func(modelName string, client borweininfsvc.InferenceServiceClient, errCh chan error) {
if client == nil {
errCh <- fmt.Errorf("nil client for model: %s", modelName)
return
}

borweinInferenceResults, err := bmrf.parseInferenceRespForPods(requestContainers, resp)
if err != nil {
_ = bmrf.emitter.StoreInt64(metricParseInferenceResponseFailed, 1, metrics.MetricTypeNameRaw)
return fmt.Errorf("parseInferenceRespForPods failed with error: %v", err)
resp, err := client.Inference(ctx, req)
if err != nil {
_ = bmrf.emitter.StoreInt64(metricInferenceFailed, 1, metrics.MetricTypeNameRaw)
errCh <- fmt.Errorf("Inference by model: %s failed with error: %v", modelName, err)
return
}

borweinInferenceResults, err := bmrf.parseInferenceRespForPods(requestContainers, resp)
if err != nil {
_ = bmrf.emitter.StoreInt64(metricParseInferenceResponseFailed, 1, metrics.MetricTypeNameRaw)
errCh <- fmt.Errorf("parseInferenceRespForPods from model: %s failed with error: %v", modelName, err)
return
}

err = metaWriter.SetInferenceResult(borweinutils.GetInferenceResultKey(modelName), borweinInferenceResults)
if err != nil {
_ = bmrf.emitter.StoreInt64(metricSetInferenceResultFailed, 1, metrics.MetricTypeNameRaw)
errCh <- fmt.Errorf("SetInferenceResult from model: %s failed with error: %v", modelName, err)
return
}

errCh <- nil
return
}(modelName, client, errCh)
}

err = metaWriter.SetInferenceResult(borweinconsts.ModelNameBorwein, borweinInferenceResults)
if err != nil {
_ = bmrf.emitter.StoreInt64(metricSetInferenceResultFailed, 1, metrics.MetricTypeNameRaw)
return fmt.Errorf("SetInferenceResult failed with error: %v", err)
errList := make([]error, 0, len(infSvcClients))
for i := 0; i < len(infSvcClients); i++ {
errList = append(errList, <-errCh)
}

return nil
return errors.NewAggregate(errList)
}

func (bmrf *BorweinModelResultFetcher) parseInferenceRespForPods(requestContainers []*types.ContainerInfo,
Expand All @@ -216,19 +249,31 @@ func (bmrf *BorweinModelResultFetcher) parseInferenceRespForPods(requestContaine
return nil, fmt.Errorf("invalid result for pod: %s, container: %s", podUID, containerName)
}

inferenceResults := make([]*borweininfsvc.InferenceResult, len(cResults.InferenceResults))
inferenceResults := make([]*borweininfsvc.InferenceResult, 0, len(cResults.InferenceResults))
foundResult := false
for idx, result := range cResults.InferenceResults {
if result == nil {
continue
}

inferenceResults[idx] = proto.Clone(result).(*borweininfsvc.InferenceResult)
foundResult = true

if result.ResultFlag == borweininfsvc.ResultFlag_ResultFlagSkip {
general.Infof("skip %d result for pod: %s, container: %s", idx, podUID, containerName)
continue
}

inferenceResults = append(inferenceResults, proto.Clone(result).(*borweininfsvc.InferenceResult))
}

results.SetInferenceResults(podUID, containerName, inferenceResults...)
}
if foundResult {
respContainersCnt++
}

respContainersCnt += len(results.Results[podUID])
if len(inferenceResults) > 0 {
results.SetInferenceResults(podUID, containerName, inferenceResults...)
}
}
}

overloadCnt := 0.0
Expand Down Expand Up @@ -333,8 +378,44 @@ func (bmrf *BorweinModelResultFetcher) initInferenceSvcClientConn() (bool, error
// todo: emit metrics when initializing client connection failed

// never success
if bmrf.inferenceServiceSocketAbsPath == "" {
return false, fmt.Errorf("empty inferenceServiceSocketAbsPath")
if bmrf.inferenceServiceSocketAbsPath == "" && len(bmrf.modelNameToInferenceSvcSockAbsPath) == 0 {
return false, fmt.Errorf("empty inference service socks information")
}

if len(bmrf.modelNameToInferenceSvcSockAbsPath) > 0 {
modelNameToConn := make(map[string]*grpc.ClientConn, len(bmrf.modelNameToInferenceSvcSockAbsPath))

allSuccess := true
for modelName, sockAbsPath := range bmrf.modelNameToInferenceSvcSockAbsPath {
infSvcConn, err := process.Dial(sockAbsPath, 5*time.Second)
if err != nil {
general.Errorf("get inference svc connection with socket: %s for model: %s failed with error",
sockAbsPath, modelName)
allSuccess = false
break
}

modelNameToConn[modelName] = infSvcConn
}

if !allSuccess {
for modelName, conn := range modelNameToConn {
err := conn.Close()
if err != nil {
general.Errorf("close connection for model: %s failed with error: %v",
modelName, err)
}
}
} else {
bmrf.clientLock.Lock()
bmrf.modelNameToInferenceSvcClient = make(map[string]borweininfsvc.InferenceServiceClient, len(modelNameToConn))
for modelName, conn := range modelNameToConn {
bmrf.modelNameToInferenceSvcClient[modelName] = borweininfsvc.NewInferenceServiceClient(conn)
}
bmrf.clientLock.Unlock()
}

return allSuccess, nil
}

infSvcConn, err := process.Dial(bmrf.inferenceServiceSocketAbsPath, 5*time.Second)
Expand Down Expand Up @@ -366,12 +447,13 @@ func NewBorweinModelResultFetcher(fetcherName string, conf *config.Configuration
emitter := emitterPool.GetDefaultMetricsEmitter().WithTags(BorweinModelResultFetcherName)

bmrf := &BorweinModelResultFetcher{
name: fetcherName,
emitter: emitter,
qosConfig: conf.QoSConfiguration,
nodeFeatureNames: conf.BorweinConfiguration.NodeFeatureNames,
containerFeatureNames: conf.BorweinConfiguration.ContainerFeatureNames,
inferenceServiceSocketAbsPath: conf.BorweinConfiguration.InferenceServiceSocketAbsPath,
name: fetcherName,
emitter: emitter,
qosConfig: conf.QoSConfiguration,
nodeFeatureNames: conf.BorweinConfiguration.NodeFeatureNames,
containerFeatureNames: conf.BorweinConfiguration.ContainerFeatureNames,
inferenceServiceSocketAbsPath: conf.BorweinConfiguration.InferenceServiceSocketAbsPath,
modelNameToInferenceSvcSockAbsPath: conf.BorweinConfiguration.ModelNameToInferenceSvcSockAbsPath,
}

// fetcher initializing doesn't block sys-adviosr main process
Expand Down
Loading
Loading