From c784dd7f69e97fd134e2adbc6d319a96ee4aec5c Mon Sep 17 00:00:00 2001 From: "wangzhe.21" Date: Mon, 13 Nov 2023 19:23:46 +0800 Subject: [PATCH] report topology provider policy in agent --- cmd/base/context_fake.go | 3 + cmd/katalyst-scheduler/app/server.go | 5 +- cmd/katalyst-scheduler/main.go | 2 + go.mod | 1 + go.sum | 4 +- .../overcommitment_aware.go | 2 +- .../overcommitmentaware/reporter/reporter.go | 123 +++- .../reporter/reporter_test.go | 67 +- pkg/controller/overcommit/node/handler.go | 74 +-- pkg/controller/overcommit/node/node.go | 261 ++++++-- pkg/controller/overcommit/node/node_test.go | 306 ++++++++- pkg/scheduler/cache/nodeinfo.go | 1 + pkg/scheduler/eventhandlers/cnr_handler.go | 8 + pkg/scheduler/eventhandlers/handler.go | 52 ++ pkg/scheduler/eventhandlers/pod_handler.go | 8 + .../plugins/nodeovercommitment/cache/cache.go | 207 ++++++ .../nodeovercommitment/cache/handler.go | 173 +++++ .../plugins/nodeovercommitment/fit.go | 228 +++++++ .../plugins/nodeovercommitment/fit_test.go | 615 ++++++++++++++++++ .../plugins/nodeovercommitment/plugin.go | 52 ++ .../plugins/nodeovercommitment/plugin_test.go | 30 + .../plugins/nodeovercommitment/reserve.go | 42 ++ .../nodeovercommitment/reserve_test.go | 189 ++++++ .../plugins/noderesourcetopology/plugin.go | 4 + .../balanced_allocation.go | 4 + .../plugins/qosawarenoderesources/fit.go | 4 + pkg/util/cnr.go | 1 + pkg/util/native/pod_resource.go | 40 ++ pkg/util/native/resources_test.go | 23 +- .../mutating/node/allocatable_mutator.go | 143 +++- pkg/webhook/mutating/node/node_test.go | 92 +++ 31 files changed, 2588 insertions(+), 176 deletions(-) create mode 100644 pkg/scheduler/eventhandlers/handler.go create mode 100644 pkg/scheduler/plugins/nodeovercommitment/cache/cache.go create mode 100644 pkg/scheduler/plugins/nodeovercommitment/cache/handler.go create mode 100644 pkg/scheduler/plugins/nodeovercommitment/fit.go create mode 100644 pkg/scheduler/plugins/nodeovercommitment/fit_test.go create mode 100644 pkg/scheduler/plugins/nodeovercommitment/plugin.go create mode 100644 pkg/scheduler/plugins/nodeovercommitment/plugin_test.go create mode 100644 pkg/scheduler/plugins/nodeovercommitment/reserve.go create mode 100644 pkg/scheduler/plugins/nodeovercommitment/reserve_test.go diff --git a/cmd/base/context_fake.go b/cmd/base/context_fake.go index d5919bea8d..93f113b682 100644 --- a/cmd/base/context_fake.go +++ b/cmd/base/context_fake.go @@ -21,6 +21,8 @@ import ( "reflect" "strconv" + nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -241,6 +243,7 @@ func GenerateFakeGenericContext(objects ...[]runtime.Object) (*GenericContext, e utilruntime.Must(v1alpha1.AddToScheme(scheme)) utilruntime.Must(overcommitapis.AddToScheme(scheme)) utilruntime.Must(apiregistration.AddToScheme(scheme)) + utilruntime.Must(nodev1alpha1.AddToScheme(scheme)) fakeMetaClient := metaFake.NewSimpleMetadataClient(scheme, nilObjectFilter(metaObjects)...) fakeInternalClient := externalfake.NewSimpleClientset(nilObjectFilter(internalObjects)...) diff --git a/cmd/katalyst-scheduler/app/server.go b/cmd/katalyst-scheduler/app/server.go index 238e6869df..770bc9a7c2 100644 --- a/cmd/katalyst-scheduler/app/server.go +++ b/cmd/katalyst-scheduler/app/server.go @@ -357,8 +357,9 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions return nil, nil, err } - eventhandlers.AddCNREventHandler(cc.InformerFactory, cc.InternalInformerFactory) - eventhandlers.AddPodEventHandler(cc.InformerFactory, cc.InternalInformerFactory) + for _, handlerFunc := range eventhandlers.ListEventHandlerFunc() { + handlerFunc(cc.InformerFactory, cc.InternalInformerFactory) + } return &cc, sched, nil } diff --git a/cmd/katalyst-scheduler/main.go b/cmd/katalyst-scheduler/main.go index 09efe0ea38..20671696e3 100644 --- a/cmd/katalyst-scheduler/main.go +++ b/cmd/katalyst-scheduler/main.go @@ -23,6 +23,7 @@ import ( "k8s.io/component-base/logs" "github.com/kubewharf/katalyst-core/cmd/katalyst-scheduler/app" + "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/nodeovercommitment" "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/noderesourcetopology" "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/qosawarenoderesources" @@ -38,6 +39,7 @@ func main() { app.WithPlugin(qosawarenoderesources.FitName, qosawarenoderesources.NewFit), app.WithPlugin(qosawarenoderesources.BalancedAllocationName, qosawarenoderesources.NewBalancedAllocation), app.WithPlugin(noderesourcetopology.TopologyMatchName, noderesourcetopology.New), + app.WithPlugin(nodeovercommitment.Name, nodeovercommitment.New), ) if err := runCommand(command); err != nil { diff --git a/go.mod b/go.mod index c6b846ba6f..20971f7239 100644 --- a/go.mod +++ b/go.mod @@ -151,6 +151,7 @@ require ( ) replace ( + github.com/kubewharf/katalyst-api => github.com/WangZzzhe/katalyst-api v0.0.0-20240407125347-efbac16eb31f k8s.io/api => k8s.io/api v0.24.6 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6 k8s.io/apimachinery => k8s.io/apimachinery v0.24.6 diff --git a/go.sum b/go.sum index 7ef0080bb1..1abfde8ab7 100644 --- a/go.sum +++ b/go.sum @@ -82,6 +82,8 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/WangZzzhe/katalyst-api v0.0.0-20240407125347-efbac16eb31f h1:qF8pG/qlbvNr2s7v3F2XX2zlZoQa5z4c4Ae8VQ9arso= +github.com/WangZzzhe/katalyst-api v0.0.0-20240407125347-efbac16eb31f/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -554,8 +556,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36 h1:AUsgMb1EaUbrmUWQU7zAXnZHAOPAH65bx/MELm3qaAQ= -github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k= github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4= github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c= github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8= diff --git a/pkg/agent/sysadvisor/plugin/overcommitmentaware/overcommitment_aware.go b/pkg/agent/sysadvisor/plugin/overcommitmentaware/overcommitment_aware.go index 406aec5877..ee81a22daf 100644 --- a/pkg/agent/sysadvisor/plugin/overcommitmentaware/overcommitment_aware.go +++ b/pkg/agent/sysadvisor/plugin/overcommitmentaware/overcommitment_aware.go @@ -55,7 +55,7 @@ func NewOvercommitmentAwarePlugin( realtimeOvercommitmentAdvisor := realtime.NewRealtimeOvercommitmentAdvisor(conf, metaServer, emitter) - overcommitRatioReporter, err := reporter.NewOvercommitRatioReporter(emitter, conf, realtimeOvercommitmentAdvisor) + overcommitRatioReporter, err := reporter.NewOvercommitRatioReporter(emitter, conf, realtimeOvercommitmentAdvisor, metaServer) if err != nil { return nil, err } diff --git a/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter.go b/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter.go index 38997d5002..b373cc76b6 100644 --- a/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter.go +++ b/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter.go @@ -20,18 +20,24 @@ import ( "context" "encoding/json" "fmt" + "strconv" "sync" + "github.com/pkg/errors" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" + kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" + "k8s.io/kubernetes/pkg/features" "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-api/pkg/plugins/registration" "github.com/kubewharf/katalyst-api/pkg/plugins/skeleton" "github.com/kubewharf/katalyst-api/pkg/protocol/reporterplugin/v1alpha1" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util" + "github.com/kubewharf/katalyst-core/pkg/util/native" ) const ( @@ -55,8 +61,9 @@ func NewOvercommitRatioReporter( emitter metrics.MetricEmitter, conf *config.Configuration, manager OvercommitManager, + metaServer *metaserver.MetaServer, ) (OvercommitRatioReporter, error) { - plugin, err := newOvercommitRatioReporterPlugin(emitter, conf, manager) + plugin, err := newOvercommitRatioReporterPlugin(emitter, conf, manager, metaServer) if err != nil { return nil, fmt.Errorf("[overcommit-reporter] create reporter failed: %v", err) } @@ -80,7 +87,8 @@ func (o *overcommitRatioReporterImpl) Run(ctx context.Context) { type OvercommitRatioReporterPlugin struct { sync.Mutex - manager OvercommitManager + manager OvercommitManager + metaServer *metaserver.MetaServer ctx context.Context cancel context.CancelFunc @@ -91,10 +99,12 @@ func newOvercommitRatioReporterPlugin( emitter metrics.MetricEmitter, conf *config.Configuration, overcommitManager OvercommitManager, + metaserver *metaserver.MetaServer, ) (skeleton.GenericPlugin, error) { reporter := &OvercommitRatioReporterPlugin{ - manager: overcommitManager, + manager: overcommitManager, + metaServer: metaserver, } return skeleton.NewRegistrationPluginWrapper(reporter, []string{conf.PluginRegistrationDir}, @@ -146,6 +156,10 @@ func (o *OvercommitRatioReporterPlugin) Stop() error { // Since the metrics collected by Manager are already an average within a time period, // we expect a faster response to node load fluctuations to avoid excessive overcommit of online resources. func (o *OvercommitRatioReporterPlugin) GetReportContent(_ context.Context, _ *v1alpha1.Empty) (*v1alpha1.GetReportContentResponse, error) { + response := &v1alpha1.GetReportContentResponse{ + Content: []*v1alpha1.ReportContent{}, + } + overcommitRatioMap, err := o.manager.GetOvercommitRatio() if err != nil { klog.Errorf("OvercommitRatioReporterPlugin GetOvercommitMent fail: %v", err) @@ -154,16 +168,20 @@ func (o *OvercommitRatioReporterPlugin) GetReportContent(_ context.Context, _ *v klog.V(6).Infof("reporter get overcommit ratio: %v", overcommitRatioMap) // overcommit data to CNR data - reportToCNR, err := o.overcommitRatioToCNRAnnotation(overcommitRatioMap) + overcommitRatioContent, err := o.overcommitRatioToCNRAnnotation(overcommitRatioMap) if err != nil { return nil, err } + response.Content = append(response.Content, overcommitRatioContent) - return &v1alpha1.GetReportContentResponse{ - Content: []*v1alpha1.ReportContent{ - reportToCNR, - }, - }, nil + // get topologyProvider and guaranteed cpus + topologyProviderContent, err := o.getTopologyProviderReportContent(o.ctx) + if err != nil { + return nil, err + } + response.Content = append(response.Content, topologyProviderContent) + + return response, nil } func (o *OvercommitRatioReporterPlugin) ListAndWatchReportContent(_ *v1alpha1.Empty, server v1alpha1.ReporterPlugin_ListAndWatchReportContentServer) error { @@ -211,3 +229,90 @@ func (o *OvercommitRatioReporterPlugin) overcommitRatioToCNRAnnotation(overcommi }, }, nil } + +func (o *OvercommitRatioReporterPlugin) getTopologyProviderReportContent(ctx context.Context) (*v1alpha1.ReportContent, error) { + annotations, err := o.getTopologyProvider(ctx) + if err != nil { + return nil, fmt.Errorf("get topology provider from adapter failed: %v", err) + } + + guaranteedCPUs := "0" + if annotations[consts.KCNRAnnotationCPUManager] != string(consts.CPUManagerPolicyNone) { + guaranteedCPUs, err = o.getGuaranteedCPUs(ctx) + if err != nil { + return nil, err + } + } + annotations[consts.KCNRAnnotationGuaranteedCPUs] = guaranteedCPUs + + value, err := json.Marshal(&annotations) + if err != nil { + return nil, errors.Wrap(err, "marshal topology provider failed") + } + + return &v1alpha1.ReportContent{ + GroupVersionKind: &util.CNRGroupVersionKind, + Field: []*v1alpha1.ReportField{ + { + FieldType: v1alpha1.FieldType_Metadata, + FieldName: util.CNRFieldNameAnnotations, + Value: value, + }, + }, + }, nil +} + +func (o *OvercommitRatioReporterPlugin) getTopologyProvider(ctx context.Context) (map[string]string, error) { + klConfig, err := o.metaServer.GetKubeletConfig(ctx) + if err != nil { + return nil, fmt.Errorf("get kubelet config fail: %v", err) + } + + return generateProviderPolicies(klConfig), nil +} + +func (o *OvercommitRatioReporterPlugin) getGuaranteedCPUs(ctx context.Context) (string, error) { + podList, err := o.metaServer.GetPodList(ctx, func(pod *v1.Pod) bool { + return true + }) + + if err != nil { + return "", errors.Wrap(err, "get pod list from metaserver failed") + } + + cpus := 0 + for _, pod := range podList { + cpus += native.PodGuaranteedCPUs(pod) + } + + return strconv.Itoa(cpus), nil +} + +func generateProviderPolicies(kubeletConfig *kubeletconfigv1beta1.KubeletConfiguration) map[string]string { + klog.V(5).Infof("generateProviderPolicies featureGates: %v, cpuManagerPolicy: %v, memoryManagerPolicy: %v", + kubeletConfig.FeatureGates, features.CPUManager, features.MemoryManager) + + featureGates := kubeletConfig.FeatureGates + + res := map[string]string{ + consts.KCNRAnnotationCPUManager: string(consts.CPUManagerOff), + consts.KCNRAnnotationMemoryManager: string(consts.MemoryManagerOff), + } + + on, ok := featureGates[string(features.CPUManager)] + // default true + if (ok && on) || (!ok) { + if kubeletConfig.CPUManagerPolicy != "" { + res[consts.KCNRAnnotationCPUManager] = kubeletConfig.CPUManagerPolicy + } + } + + on, ok = featureGates[string(features.MemoryManager)] + if (ok && on) || (!ok) { + if kubeletConfig.MemoryManagerPolicy != "" { + res[consts.KCNRAnnotationMemoryManager] = kubeletConfig.MemoryManagerPolicy + } + } + + return res +} diff --git a/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter_test.go b/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter_test.go index 1cf0e9341f..34e23418ce 100644 --- a/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter_test.go +++ b/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter_test.go @@ -22,16 +22,39 @@ import ( "fmt" "testing" - "github.com/kubewharf/katalyst-api/pkg/consts" - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + + "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/kubeletconfig" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" ) func TestGetReportContent(t *testing.T) { + t.Parallel() + + fakeKubeletConfig := kubeletconfigv1beta1.KubeletConfiguration{ + FeatureGates: map[string]bool{ + string(features.CPUManager): true, + string(features.MemoryManager): false, + }, + CPUManagerPolicy: string(cpumanager.PolicyStatic), + } + p := &OvercommitRatioReporterPlugin{ manager: NewFakeOvercommitManager(map[v1.ResourceName]float64{}), + metaServer: &metaserver.MetaServer{ + MetaAgent: &agent.MetaAgent{ + KubeletConfigFetcher: kubeletconfig.NewFakeKubeletConfigFetcher(fakeKubeletConfig), + }, + }, } _, err := p.GetReportContent(context.TODO(), nil) @@ -43,17 +66,53 @@ func TestGetReportContent(t *testing.T) { v1.ResourceMemory: 1.2678, v1.ResourceStorage: 1.0, }), + metaServer: &metaserver.MetaServer{ + MetaAgent: &agent.MetaAgent{ + KubeletConfigFetcher: kubeletconfig.NewFakeKubeletConfigFetcher(fakeKubeletConfig), + PodFetcher: &pod.PodFetcherStub{ + PodList: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "p1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Resources: v1.ResourceRequirements{ + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, } res, err := p.GetReportContent(context.TODO(), nil) assert.NoError(t, err) - assert.Equal(t, 1, len(res.Content)) + assert.Equal(t, 2, len(res.Content)) ratio := map[string]string{} err = json.Unmarshal(res.Content[0].Field[0].Value, &ratio) assert.NoError(t, err) assert.Equal(t, "1.51", ratio[consts.NodeAnnotationCPUOvercommitRatioKey]) assert.Equal(t, "1.27", ratio[consts.NodeAnnotationMemoryOvercommitRatioKey]) + + anno := map[string]string{} + err = json.Unmarshal(res.Content[1].Field[0].Value, &anno) + assert.NoError(t, err) + assert.Equal(t, "static", anno[string(consts.KCNRAnnotationCPUManager)]) } func TestStart(t *testing.T) { diff --git a/pkg/controller/overcommit/node/handler.go b/pkg/controller/overcommit/node/handler.go index 74d03cadfa..b1d92b88d8 100644 --- a/pkg/controller/overcommit/node/handler.go +++ b/pkg/controller/overcommit/node/handler.go @@ -44,6 +44,37 @@ const ( configEvent eventType = "config" ) +func (nc *NodeOvercommitController) addCNR(obj interface{}) { + cnr, ok := obj.(*nodev1alpha1.CustomNodeResource) + if !ok { + klog.Errorf("cannot convert obj to *CustomNodeResource: %v", obj) + return + } + + klog.V(4).Infof("[noc] notice addition of CNR %s", cnr.Name) + nc.enqueueCNR(cnr) +} + +func (nc *NodeOvercommitController) updateCNR(old, new interface{}) { + oldCNR, ok := old.(*nodev1alpha1.CustomNodeResource) + if !ok { + klog.Errorf("cannot convert obj to CustomNodeResource: %v", old) + return + } + + newCNR, ok := new.(*nodev1alpha1.CustomNodeResource) + if !ok { + klog.Errorf("cannot convert obj to CustomNodeResource: %v", new) + return + } + + if reflect.DeepEqual(newCNR.Annotations, oldCNR.Annotations) { + return + } + + nc.enqueueCNR(newCNR) +} + func (nc *NodeOvercommitController) addNodeOvercommitConfig(obj interface{}) { noc, ok := obj.(*v1alpha1.NodeOvercommitConfig) if !ok { @@ -116,49 +147,6 @@ func (nc *NodeOvercommitController) updateNode(old, new interface{}) { nc.enqueueNode(newNode) } -func (nc *NodeOvercommitController) addCNR(obj interface{}) { - cnr, ok := obj.(*nodev1alpha1.CustomNodeResource) - if !ok { - klog.Errorf("cannot convert obj to *CustomNodeResource: %v", obj) - return - } - - if len(cnr.Annotations) == 0 { - klog.V(6).Infof("cnr without annotation, skip") - return - } - _, cpuOk := cnr.Annotations[consts.NodeAnnotationCPUOvercommitRatioKey] - _, memOk := cnr.Annotations[consts.NodeAnnotationMemoryOvercommitRatioKey] - if !cpuOk && !memOk { - klog.V(6).Infof("cnr without overcommit ratio, skip") - return - } - - klog.V(4).Infof("[noc] notice addition of CNR %s", cnr.Name) - nc.enqueueCNR(cnr) -} - -func (nc *NodeOvercommitController) updateCNR(old, new interface{}) { - oldCNR, ok := old.(*nodev1alpha1.CustomNodeResource) - if !ok { - klog.Errorf("cannot convert obj to CustomNodeResource: %v", old) - return - } - - newCNR, ok := new.(*nodev1alpha1.CustomNodeResource) - if !ok { - klog.Errorf("cannot convert obj to CustomNodeResource: %v", new) - return - } - - if reflect.DeepEqual(newCNR.Annotations, oldCNR.Annotations) { - return - } - - nc.enqueueCNR(newCNR) - klog.V(4).Infof("[noc] notice update of CNR %s", newCNR.Name) -} - func (nc *NodeOvercommitController) enqueueNodeOvercommitConfig(noc *v1alpha1.NodeOvercommitConfig, eventType eventType) { if noc == nil { klog.Warning("[noc] trying to enqueue a nil config") diff --git a/pkg/controller/overcommit/node/node.go b/pkg/controller/overcommit/node/node.go index 73d92e3e99..f8d6c1a486 100644 --- a/pkg/controller/overcommit/node/node.go +++ b/pkg/controller/overcommit/node/node.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -32,6 +33,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" configv1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/overcommit/v1alpha1" nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/client/listers/node/v1alpha1" @@ -43,6 +45,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/config/generic" "github.com/kubewharf/katalyst-core/pkg/controller/overcommit/node/matcher" "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/native" ) const nodeOvercommitControllerName = "noc" @@ -115,6 +118,7 @@ func NewNodeOvercommitController( syncedFunc: []cache.InformerSynced{ nodeInformer.Informer().HasSynced, nodeOvercommitInformer.Informer().HasSynced, + cnrInformer.Informer().HasSynced, }, matcher: &matcher.DummyMatcher{}, reconcilePeriod: overcommitConf.Node.ConfigReconcilePeriod, @@ -156,6 +160,7 @@ func (nc *NodeOvercommitController) Run() { defer func() { nc.nocSyncQueue.ShutDown() nc.nodeSyncQueue.ShutDown() + nc.cnrSyncQueue.ShutDown() klog.Infof("Shutting down %s controller", nodeOvercommitControllerName) }() @@ -424,84 +429,28 @@ func (nc *NodeOvercommitController) patchNodeOvercommitConfigStatus(configName s } func (nc *NodeOvercommitController) setNodeOvercommitAnnotations(nodeName string, config *configv1alpha1.NodeOvercommitConfig) error { - if config == nil { - config = nc.matcher.GetConfig(nodeName) - } - if config == nil { - return nc.setNodeOvercommitAnnotationsWithConfig(nodeName, config) - } - - cnr, err := nc.cnrLister.Get(nodeName) - if err != nil { - if errors.IsNotFound(err) { - klog.V(6).Infof("node %v without cnr", nodeName) - } else { - klog.Errorf("get node %s cnr fail: %v", nodeName, err) - } - return nc.setNodeOvercommitAnnotationsWithConfig(nodeName, config) - } - - if len(cnr.Annotations) == 0 { - return nc.setNodeOvercommitAnnotationsWithConfig(nodeName, config) - } - - _, cpuOk := cnr.Annotations[consts.NodeAnnotationCPUOvercommitRatioKey] - _, memOk := cnr.Annotations[consts.NodeAnnotationMemoryOvercommitRatioKey] - if !cpuOk && !memOk { - return nc.setNodeOvercommitAnnotationsWithConfig(nodeName, config) - } - - nocCopy := config.DeepCopy() - for resourceName, annotationKey := range resourceAnnotationKey { - c, ok := config.Spec.ResourceOvercommitRatio[resourceName] - if !ok { - continue - } - cnrOvercommitRatioAnnotation, ok := cnr.Annotations[annotationKey] - if !ok { - continue - } - - setOvercommitRatio, err := strconv.ParseFloat(c, 64) - if err != nil { - klog.Errorf("unexpected overcommitRatio: %s", c) - return err - } - cnrOvercommitRatio, err := strconv.ParseFloat(cnrOvercommitRatioAnnotation, 64) - if err != nil { - klog.Errorf("unexpected overcommitRatio: %s", cnrOvercommitRatioAnnotation) - continue - } - - klog.V(5).Infof("[noc] syncCNR, resourceName: %v, setOvercommitRatio: %v, cnrOvercommitRatio: %v", resourceName, setOvercommitRatio, cnrOvercommitRatio) - if cnrOvercommitRatio >= setOvercommitRatio { - continue - } - nocCopy.Spec.ResourceOvercommitRatio[resourceName] = cnrOvercommitRatioAnnotation - } - - return nc.setNodeOvercommitAnnotationsWithConfig(nodeName, nocCopy) -} - -func (nc *NodeOvercommitController) setNodeOvercommitAnnotationsWithConfig(nodeName string, config *configv1alpha1.NodeOvercommitConfig) error { + // get node from node index node, err := nc.nodeLister.Get(nodeName) if err != nil { klog.Errorf("get node %s fail: %v", nodeName, err) return err } + // get noc if nil + if config == nil { + config = nc.matcher.GetConfig(nodeName) + } + nodeOvercommitConfig := emptyOvercommitConfig() + if config != nil { + nodeOvercommitConfig = config + } + nodeCopy := node.DeepCopy() nodeAnnotations := nodeCopy.GetAnnotations() if nodeAnnotations == nil { nodeAnnotations = make(map[string]string) } - var ( - nodeOvercommitConfig = emptyOvercommitConfig() - ) - if config != nil { - nodeOvercommitConfig = config - } for resourceName, annotationKey := range resourceAnnotationKey { c, ok := nodeOvercommitConfig.Spec.ResourceOvercommitRatio[resourceName] if !ok { @@ -515,11 +464,33 @@ func (nc *NodeOvercommitController) setNodeOvercommitAnnotationsWithConfig(nodeN nodeAnnotations[annotationKey] = c } } + + nc.nodeRealtimeOvercommitRatio(nodeAnnotations, node) + + cpuAllocatable, cpuCapacity := nc.nodeOvercommitResource(node, validCPUOvercommitRatio(nodeAnnotations), corev1.ResourceCPU, consts.NodeAnnotationOriginalAllocatableCPUKey, consts.NodeAnnotationOriginalCapacityCPUKey) + klog.V(5).Infof("node %s CPU allocatable: %v, CPU capacity: %v with bindcpu", node.Name, cpuAllocatable, cpuCapacity) + if cpuAllocatable == "" { + delete(nodeAnnotations, consts.NodeAnnotationOvercommitAllocatableCPUKey) + delete(nodeAnnotations, consts.NodeAnnotationOvercommitCapacityCPUKey) + } else { + nodeAnnotations[consts.NodeAnnotationOvercommitAllocatableCPUKey] = cpuAllocatable + nodeAnnotations[consts.NodeAnnotationOvercommitCapacityCPUKey] = cpuCapacity + } + + memAllocatable, memCapacity := nc.nodeOvercommitResource(node, validMemoryOvercommitRatio(nodeAnnotations), corev1.ResourceMemory, consts.NodeAnnotationOriginalAllocatableMemoryKey, consts.NodeAnnotationOriginalCapacityMemoryKey) + klog.V(5).Infof("node %s memory allocatable: %v, memory capacity: %v", node.Name, memAllocatable, memCapacity) + if memAllocatable == "" { + delete(nodeAnnotations, consts.NodeAnnotationOvercommitAllocatableMemoryKey) + delete(nodeAnnotations, consts.NodeAnnotationOvercommitCapacityMemoryKey) + } else { + nodeAnnotations[consts.NodeAnnotationOvercommitAllocatableMemoryKey] = memAllocatable + nodeAnnotations[consts.NodeAnnotationOvercommitCapacityMemoryKey] = memCapacity + } + nodeCopy.Annotations = nodeAnnotations if !reflect.DeepEqual(nodeAnnotations, node.Annotations) { return nc.nodeUpdater.PatchNode(nc.ctx, node, nodeCopy) } - return nil } @@ -530,3 +501,159 @@ func emptyOvercommitConfig() *configv1alpha1.NodeOvercommitConfig { }, } } + +func (nc *NodeOvercommitController) nodeRealtimeOvercommitRatio(nodeAnnotation map[string]string, node *corev1.Node) { + kcnr, err := nc.cnrLister.Get(node.Name) + if err != nil { + klog.Error(err) + return + } + + if len(kcnr.Annotations) == 0 { + delete(nodeAnnotation, consts.NodeAnnotationRealtimeCPUOvercommitRatioKey) + delete(nodeAnnotation, consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey) + return + } + + realtimeCPU, ok := kcnr.Annotations[consts.NodeAnnotationCPUOvercommitRatioKey] + if ok { + nodeAnnotation[consts.NodeAnnotationRealtimeCPUOvercommitRatioKey] = realtimeCPU + } else { + delete(nodeAnnotation, consts.NodeAnnotationRealtimeCPUOvercommitRatioKey) + } + + realtimeMem, ok := kcnr.Annotations[consts.NodeAnnotationMemoryOvercommitRatioKey] + if ok { + nodeAnnotation[consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey] = realtimeMem + } else { + delete(nodeAnnotation, consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey) + } +} + +func (nc *NodeOvercommitController) nodeOvercommitResource( + node *corev1.Node, + overcommitRatio float64, + resourceName corev1.ResourceName, + originalAllocatableKey, originalCapacityKey string) (string, string) { + var guaranteedResource int + + if overcommitRatio <= 1 { + klog.V(5).Infof("node %v cpu overcommit ratio less than 1: %v", node.Name, overcommitRatio) + overcommitRatio = 1.0 + } + + // node original allocatable and capacity should be always exists if overcommit webhook is running + // node allocatable can not be calculated without original allocatable, just return empty string. + nodeAllocatableAnnotation, ok := node.Annotations[originalAllocatableKey] + if !ok { + klog.V(5).Infof("node %s annotation %s missing", node.Name, originalAllocatableKey) + return "", "" + } + nodeCapacityAnnotation, ok := node.Annotations[originalCapacityKey] + if !ok { + klog.V(5).Infof("node %s annotation %s missing", node.Name, originalCapacityKey) + return "", "" + } + nodeAllocatable, err := resource.ParseQuantity(nodeAllocatableAnnotation) + if err != nil { + klog.Error(err) + return "", "" + } + nodeCapacity, err := resource.ParseQuantity(nodeCapacityAnnotation) + if err != nil { + klog.Error(err) + return "", "" + } + + if resourceName == corev1.ResourceCPU { + guaranteedCPU, err := nc.getGuaranteedCPU(node.Name) + if err != nil { + klog.Error(err) + } else { + guaranteedResource = guaranteedCPU + } + } + + guaranteedQuantity := resource.NewQuantity(int64(guaranteedResource), resource.DecimalSI) + nodeAllocatable.Sub(*guaranteedQuantity) + nodeAllocatable = native.MultiplyMilliQuantity(nodeAllocatable, overcommitRatio) + nodeAllocatable.Add(*guaranteedQuantity) + nodeCapacity.Sub(*guaranteedQuantity) + nodeCapacity = native.MultiplyMilliQuantity(nodeCapacity, overcommitRatio) + nodeCapacity.Add(*guaranteedQuantity) + + klog.V(5).Infof("node %s overcommitRatio: %v, guaranteedResource: %v, final allocatable: %v, capacity: %v", + node.Name, overcommitRatio, guaranteedResource, nodeAllocatable.String(), nodeCapacity.String()) + return nodeAllocatable.String(), nodeCapacity.String() +} + +func (nc *NodeOvercommitController) getGuaranteedCPU(nodeName string) (int, error) { + kcnr, err := nc.cnrLister.Get(nodeName) + if err != nil { + klog.Error(err) + return 0, err + } + + if kcnr.Annotations == nil { + klog.V(5).Infof("node %s with nil annotation", nodeName) + return 0, nil + } + + if kcnr.Annotations[consts.KCNRAnnotationCPUManager] != string(cpumanager.PolicyStatic) { + klog.V(5).Infof("node %s not support cpu manager", kcnr.Name) + return 0, nil + } + + cpusAnnotation, ok := kcnr.Annotations[consts.KCNRAnnotationGuaranteedCPUs] + if !ok { + klog.V(5).Infof("node %s guaranteed cpus missing", kcnr.Name) + return 0, nil + } + guaranteedCPUs, err := strconv.Atoi(cpusAnnotation) + if err != nil { + klog.Error(err) + return 0, err + } + + return guaranteedCPUs, nil +} + +func validCPUOvercommitRatio(annotation map[string]string) float64 { + return validOvercommitRatio(annotation, consts.NodeAnnotationCPUOvercommitRatioKey, consts.NodeAnnotationRealtimeCPUOvercommitRatioKey) +} + +func validMemoryOvercommitRatio(annotation map[string]string) float64 { + return validOvercommitRatio(annotation, consts.NodeAnnotationMemoryOvercommitRatioKey, consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey) +} + +func validOvercommitRatio( + annotation map[string]string, + setOvercommitRatioKey string, + realtimeOvercommitRatioKey string) float64 { + setValueStr, ok := annotation[setOvercommitRatioKey] + if !ok { + return 1.0 + } + + setValue, err := strconv.ParseFloat(setValueStr, 64) + if err != nil { + klog.Errorf("unknow overcommit ratio: %v, err: %v", setValueStr, err) + return 1.0 + } + + realtimeValueStr, ok := annotation[realtimeOvercommitRatioKey] + if !ok { + return setValue + } + + realtimeValue, err := strconv.ParseFloat(realtimeValueStr, 64) + if err != nil { + klog.Errorf("unknow realtime overcommit ratio: %v, err: %v", setValueStr, err) + return setValue + } + + if realtimeValue < setValue { + return realtimeValue + } + return setValue +} diff --git a/pkg/controller/overcommit/node/node_test.go b/pkg/controller/overcommit/node/node_test.go index 847d8c2ce3..56fc80a86f 100644 --- a/pkg/controller/overcommit/node/node_test.go +++ b/pkg/controller/overcommit/node/node_test.go @@ -19,6 +19,7 @@ package node import ( "context" "fmt" + "strconv" "testing" "time" @@ -29,6 +30,7 @@ import ( cliflag "k8s.io/component-base/cli/flag" nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + v1alpha12 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" "github.com/kubewharf/katalyst-api/pkg/apis/overcommit/v1alpha1" "github.com/kubewharf/katalyst-api/pkg/consts" katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" @@ -63,6 +65,9 @@ func makeNode(name string, labels map[string]string) *corev1.Node { ObjectMeta: metav1.ObjectMeta{ Name: name, Labels: labels, + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_allocatable_cpu": "8", + }, }, } } @@ -216,10 +221,14 @@ var testCases = []testCase{ makeSelectorNoc("config-selector", "2", "2", "pool1"), }, result: map[string]map[string]string{ - "node1": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1"}, - "node2": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1"}, - "node3": {consts.NodeAnnotationCPUOvercommitRatioKey: "2", consts.NodeAnnotationMemoryOvercommitRatioKey: "2"}, - "node4": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1"}, + "node1": {consts.NodeAnnotationCPUOvercommitRatioKey: "2", consts.NodeAnnotationMemoryOvercommitRatioKey: "2", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "1", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "1"}, + "node2": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "1", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "1"}, + "node3": {consts.NodeAnnotationCPUOvercommitRatioKey: "2", consts.NodeAnnotationMemoryOvercommitRatioKey: "2", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "3", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "3"}, + "node4": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "2", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "2"}, }, }, { @@ -230,10 +239,14 @@ var testCases = []testCase{ }, addCNR: defaultInitCNR, result: map[string]map[string]string{ - "node1": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1"}, - "node2": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1"}, - "node3": {consts.NodeAnnotationCPUOvercommitRatioKey: "2", consts.NodeAnnotationMemoryOvercommitRatioKey: "2"}, - "node4": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1"}, + "node1": {consts.NodeAnnotationCPUOvercommitRatioKey: "2", consts.NodeAnnotationMemoryOvercommitRatioKey: "2", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "1", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "1"}, + "node2": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "1", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "1"}, + "node3": {consts.NodeAnnotationCPUOvercommitRatioKey: "2", consts.NodeAnnotationMemoryOvercommitRatioKey: "2", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "3", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "3"}, + "node4": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "2", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "2"}, }, }, { @@ -254,10 +267,14 @@ var testCases = []testCase{ }}}, }, result: map[string]map[string]string{ - "node1": {consts.NodeAnnotationCPUOvercommitRatioKey: "2", consts.NodeAnnotationMemoryOvercommitRatioKey: "2"}, - "node2": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1"}, - "node3": {consts.NodeAnnotationCPUOvercommitRatioKey: "1.5", consts.NodeAnnotationMemoryOvercommitRatioKey: "1.5"}, - "node4": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1"}, + "node1": {consts.NodeAnnotationCPUOvercommitRatioKey: "2", consts.NodeAnnotationMemoryOvercommitRatioKey: "2", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "3", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "3"}, + "node2": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "1", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "1"}, + "node3": {consts.NodeAnnotationCPUOvercommitRatioKey: "2", consts.NodeAnnotationMemoryOvercommitRatioKey: "2", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "1.5", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "1.5"}, + "node4": {consts.NodeAnnotationCPUOvercommitRatioKey: "1", consts.NodeAnnotationMemoryOvercommitRatioKey: "1", + consts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "2", consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "2"}, }, }, } @@ -391,6 +408,11 @@ func TestRun(t *testing.T) { go nocController.Run() time.Sleep(1 * time.Second) + + ret, err := nocController.nodeLister.Get("node1") + assert.NoError(t, err) + fmt.Println(ret) + for _, node := range tc.addNodes { _, err = controlCtx.Client.KubeClient.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) assert.NoError(t, err) @@ -425,7 +447,6 @@ func TestRun(t *testing.T) { } time.Sleep(2 * time.Second) - for nodeName, annotations := range tc.result { for k, v := range annotations { node, err := nocController.nodeLister.Get(nodeName) @@ -441,3 +462,262 @@ func TestRun(t *testing.T) { }) } } + +func TestNodeOvercommitResource(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + cpuOvercommit string + memOvercommit string + kcnr *v1alpha12.CustomNodeResource + node *corev1.Node + expectRes string + expectMemRes string + }{ + { + name: "cpu overcommit less than 1", + cpuOvercommit: "1", + memOvercommit: "1", + kcnr: &v1alpha12.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "none", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "4", + }, + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/original_allocatable_cpu": "15600m", + "katalyst.kubewharf.io/original_capacity_cpu": "16000m", + "katalyst.kubewharf.io/original_allocatable_memory": "29258114498560m", + "katalyst.kubewharf.io/original_capacity_memory": "32612508Ki", + }, + }, + }, + expectRes: "15600m", + expectMemRes: "29258114498560m", + }, + { + name: "kcnr without annotation", + cpuOvercommit: "1", + memOvercommit: "1", + kcnr: &v1alpha12.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/original_allocatable_cpu": "15600m", + "katalyst.kubewharf.io/original_capacity_cpu": "16000m", + "katalyst.kubewharf.io/original_allocatable_memory": "29258114498560m", + "katalyst.kubewharf.io/original_capacity_memory": "32612508Ki", + }, + }, + }, + expectRes: "15600m", + expectMemRes: "29258114498560m", + }, + { + name: "wrong overcommit", + cpuOvercommit: "xx", + memOvercommit: "xx", + kcnr: &v1alpha12.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "none", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "4", + }, + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/original_allocatable_cpu": "15600m", + "katalyst.kubewharf.io/original_capacity_cpu": "16000m", + "katalyst.kubewharf.io/original_allocatable_memory": "29258114498560m", + "katalyst.kubewharf.io/original_capacity_memory": "32612508Ki", + }, + }, + }, + expectRes: "15600m", + expectMemRes: "29258114498560m", + }, + { + name: "cpu manager off", + cpuOvercommit: "1.2", + memOvercommit: "1.2", + kcnr: &v1alpha12.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "none", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "4", + }, + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/original_allocatable_cpu": "15600m", + "katalyst.kubewharf.io/original_capacity_cpu": "16000m", + "katalyst.kubewharf.io/original_allocatable_memory": "29258114498560m", + "katalyst.kubewharf.io/original_capacity_memory": "32612508Ki", + }, + }, + }, + expectRes: "18720m", + expectMemRes: "35109737398272m", + }, + { + name: "guaranteed cpu none", + cpuOvercommit: "1.2", + memOvercommit: "1.2", + kcnr: &v1alpha12.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "static", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + }, + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/original_allocatable_cpu": "15600m", + "katalyst.kubewharf.io/original_capacity_cpu": "16000m", + "katalyst.kubewharf.io/original_allocatable_memory": "29258114498560m", + "katalyst.kubewharf.io/original_capacity_memory": "32612508Ki", + }, + }, + }, + expectRes: "18720m", + expectMemRes: "35109737398272m", + }, + { + name: "wrong guaranteed cpu", + cpuOvercommit: "1.2", + memOvercommit: "1", + kcnr: &v1alpha12.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "static", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "xx", + }, + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/original_allocatable_cpu": "15600m", + "katalyst.kubewharf.io/original_capacity_cpu": "16000m", + "katalyst.kubewharf.io/original_allocatable_memory": "29258114498560m", + "katalyst.kubewharf.io/original_capacity_memory": "32612508Ki", + }, + }, + }, + expectRes: "18720m", + expectMemRes: "29258114498560m", + }, + { + name: "origin allocatable missing", + cpuOvercommit: "1.2", + kcnr: &v1alpha12.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "static", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "4", + }, + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/original_capacity_cpu": "16000m", + "katalyst.kubewharf.io/original_allocatable_memory": "29258114498560m", + }, + }, + }, + expectRes: "", + expectMemRes: "", + }, + { + name: "origin capacity missing", + cpuOvercommit: "1.2", + kcnr: &v1alpha12.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "static", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "4", + }, + }, + }, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/original_allocatable_cpu": "15600m", + }, + }, + }, + expectRes: "", + expectMemRes: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + gc, err := katalyst_base.GenerateFakeGenericContext() + assert.NoError(t, err) + _, err = gc.Client.InternalClient.NodeV1alpha1().CustomNodeResources().Create(context.TODO(), tc.kcnr, metav1.CreateOptions{}) + assert.NoError(t, err) + + fss := &cliflag.NamedFlagSets{} + nocOptions := options.NewOvercommitOptions() + nocOptions.AddFlags(fss) + nocConf := controller.NewOvercommitConfig() + _ = nocOptions.ApplyTo(nocConf) + + genericConf := &generic.GenericConfiguration{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + nocController, err := NewNodeOvercommitController(ctx, gc, genericConf, nocConf) + assert.NoError(t, err) + gc.StartInformer(nocController.ctx) + + go nocController.Run() + time.Sleep(500 * time.Millisecond) + + cpuOvercommitValue, _ := strconv.ParseFloat(tc.cpuOvercommit, 64) + allocatable, _ := nocController.nodeOvercommitResource(tc.node, cpuOvercommitValue, corev1.ResourceCPU, consts.NodeAnnotationOriginalAllocatableCPUKey, consts.NodeAnnotationOriginalCapacityCPUKey) + assert.Equal(t, tc.expectRes, allocatable) + + memOvercommitValue, _ := strconv.ParseFloat(tc.memOvercommit, 64) + allocatable, _ = nocController.nodeOvercommitResource(tc.node, memOvercommitValue, corev1.ResourceMemory, consts.NodeAnnotationOriginalAllocatableMemoryKey, consts.NodeAnnotationOriginalCapacityMemoryKey) + assert.Equal(t, tc.expectMemRes, allocatable) + }) + } +} diff --git a/pkg/scheduler/cache/nodeinfo.go b/pkg/scheduler/cache/nodeinfo.go index 655826d98b..91400e9369 100644 --- a/pkg/scheduler/cache/nodeinfo.go +++ b/pkg/scheduler/cache/nodeinfo.go @@ -175,6 +175,7 @@ func (n *NodeInfo) RemovePod(key string, pod *v1.Pod) { n.QoSResourcesNonZeroRequested.ReclaimedMilliCPU -= podInfo.QoSResourcesNonZeroRequested.ReclaimedMilliCPU n.QoSResourcesNonZeroRequested.ReclaimedMemory -= podInfo.QoSResourcesNonZeroRequested.ReclaimedMemory + delete(n.Pods, key) } func (n *NodeInfo) AddAssumedPod(pod *v1.Pod) { diff --git a/pkg/scheduler/eventhandlers/cnr_handler.go b/pkg/scheduler/eventhandlers/cnr_handler.go index 156f7f1607..c5210724f9 100644 --- a/pkg/scheduler/eventhandlers/cnr_handler.go +++ b/pkg/scheduler/eventhandlers/cnr_handler.go @@ -26,6 +26,14 @@ import ( schedulercache "github.com/kubewharf/katalyst-core/pkg/scheduler/cache" ) +const ( + CommonCNRHandler = "CommonCNRHandler" +) + +func RegisterCommonCNRHandler() { + RegisterEventHandler(CommonCNRHandler, AddCNREventHandler) +} + // AddCNREventHandler adds CNR event handlers for the scheduler. func AddCNREventHandler(_ informers.SharedInformerFactory, internalInformerFactory externalversions.SharedInformerFactory) { cnrInformer := internalInformerFactory.Node().V1alpha1().CustomNodeResources() diff --git a/pkg/scheduler/eventhandlers/handler.go b/pkg/scheduler/eventhandlers/handler.go new file mode 100644 index 0000000000..42d953c224 --- /dev/null +++ b/pkg/scheduler/eventhandlers/handler.go @@ -0,0 +1,52 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventhandlers + +import ( + "sync" + + "k8s.io/client-go/informers" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-api/pkg/client/informers/externalversions" +) + +type AddEventHandlerFunc func(informers.SharedInformerFactory, externalversions.SharedInformerFactory) + +var eventHandlerMap sync.Map + +func RegisterEventHandler(name string, eventHandlerFunc AddEventHandlerFunc) { + _, ok := eventHandlerMap.Load(name) + if ok { + klog.Warningf("eventhandler %v has been registered") + return + } + + eventHandlerMap.Store(name, eventHandlerFunc) +} + +func ListEventHandlerFunc() []AddEventHandlerFunc { + res := make([]AddEventHandlerFunc, 0) + + eventHandlerMap.Range(func(key, value any) bool { + klog.Infof("ListEventHandler: %v", key) + res = append(res, value.(AddEventHandlerFunc)) + return true + }) + + return res +} diff --git a/pkg/scheduler/eventhandlers/pod_handler.go b/pkg/scheduler/eventhandlers/pod_handler.go index 29ec5fb71c..3bd061a423 100644 --- a/pkg/scheduler/eventhandlers/pod_handler.go +++ b/pkg/scheduler/eventhandlers/pod_handler.go @@ -31,6 +31,14 @@ import ( "github.com/kubewharf/katalyst-core/pkg/util/native" ) +const ( + CommonPodHandler = "CommonPodHandler" +) + +func RegisterCommonPodHandler() { + RegisterEventHandler(CommonPodHandler, AddPodEventHandler) +} + // AddPodEventHandler adds Pod event handlers for the scheduler. func AddPodEventHandler(informerFactory informers.SharedInformerFactory, _ externalversions.SharedInformerFactory) { podInformer := informerFactory.Core().V1().Pods() diff --git a/pkg/scheduler/plugins/nodeovercommitment/cache/cache.go b/pkg/scheduler/plugins/nodeovercommitment/cache/cache.go new file mode 100644 index 0000000000..1a258a3539 --- /dev/null +++ b/pkg/scheduler/plugins/nodeovercommitment/cache/cache.go @@ -0,0 +1,207 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +var cache *overcommitCache + +func init() { + cache = &overcommitCache{ + nodeCaches: map[string]*NodeCache{}, + } +} + +// cache stored node native topology providers and guaranteed requested resource. +// only used in overcommit scenario when kubelet uses native topology strategy. +type overcommitCache struct { + sync.RWMutex + nodeCaches map[string]*NodeCache +} + +func GetCache() *overcommitCache { + return cache +} + +func (c *overcommitCache) GetNode(name string) (*NodeCache, error) { + c.RLock() + defer c.RUnlock() + + node, ok := c.nodeCaches[name] + if !ok { + return nil, fmt.Errorf("node %v not found", name) + } + + return node, nil +} + +func (c *overcommitCache) AddPod(pod *v1.Pod) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } + + c.Lock() + defer c.Unlock() + + n, ok := c.nodeCaches[pod.Spec.NodeName] + if !ok { + n = New() + c.nodeCaches[pod.Spec.NodeName] = n + } + n.AddPod(key, pod) + + return nil +} + +func (c *overcommitCache) RemovePod(pod *v1.Pod) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } + + c.Lock() + defer c.Unlock() + + n, ok := c.nodeCaches[pod.Spec.NodeName] + if !ok { + klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "pod", klog.KObj(pod)) + } else { + n.RemovePod(key, pod) + } + + return nil +} + +func (c *overcommitCache) AddOrUpdateCNR(cnr *v1alpha1.CustomNodeResource) { + c.Lock() + defer c.Unlock() + + n, ok := c.nodeCaches[cnr.Name] + if !ok { + n = New() + c.nodeCaches[cnr.Name] = n + } + + n.updateTopologyProvider(cnr) +} + +func (c *overcommitCache) RemoveCNR(cnr *v1alpha1.CustomNodeResource) { + c.Lock() + defer c.Unlock() + + delete(c.nodeCaches, cnr.Name) +} + +type NodeCache struct { + sync.RWMutex + + PodResources map[string]int + + // kubelet topology hint providers from CNR annotation. + // provider will be cached only if provider policy is available. + // only used for node resource overcommitment. + HintProviders map[string]struct{} + + // total guaranteed cpus on node + GuaranteedCPUs int +} + +func New() *NodeCache { + return &NodeCache{ + PodResources: map[string]int{}, + HintProviders: map[string]struct{}{}, + } +} + +func (n *NodeCache) AddPod(key string, pod *v1.Pod) { + n.RemovePod(key, pod) + guaranteedCPUs := native.PodGuaranteedCPUs(pod) + + n.Lock() + defer n.Unlock() + + n.PodResources[key] = guaranteedCPUs + n.GuaranteedCPUs += guaranteedCPUs +} + +func (n *NodeCache) RemovePod(key string, pod *v1.Pod) { + n.Lock() + defer n.Unlock() + podResource, ok := n.PodResources[key] + if !ok { + return + } + + n.GuaranteedCPUs -= podResource + delete(n.PodResources, key) +} + +func (n *NodeCache) updateTopologyProvider(cnr *v1alpha1.CustomNodeResource) { + if len(cnr.Annotations) <= 0 { + return + } + + if CPUManagerPolicy, ok := cnr.Annotations[consts.KCNRAnnotationCPUManager]; ok { + if CPUManagerPolicy == string(cpumanager.PolicyStatic) { + n.HintProviders[string(features.CPUManager)] = struct{}{} + } + } + + if memoryManagerPolicy, ok := cnr.Annotations[consts.KCNRAnnotationMemoryManager]; ok { + if memoryManagerPolicy == "Static" { + n.HintProviders[string(features.MemoryManager)] = struct{}{} + } + } +} + +func (n *NodeCache) HintProvidersAvailable() (CPUManager, MemoryManager bool) { + n.RLock() + defer n.RUnlock() + + _, ok := n.HintProviders[string(features.CPUManager)] + if ok { + CPUManager = true + } + + _, ok = n.HintProviders[string(features.MemoryManager)] + if ok { + MemoryManager = true + } + + return +} + +func (n *NodeCache) GetGuaranteedCPUs() int { + n.RLock() + defer n.RUnlock() + + return n.GuaranteedCPUs +} diff --git a/pkg/scheduler/plugins/nodeovercommitment/cache/handler.go b/pkg/scheduler/plugins/nodeovercommitment/cache/handler.go new file mode 100644 index 0000000000..010e5f508e --- /dev/null +++ b/pkg/scheduler/plugins/nodeovercommitment/cache/handler.go @@ -0,0 +1,173 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/informers" + clientgocache "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/client/informers/externalversions" + schedulercache "github.com/kubewharf/katalyst-core/pkg/scheduler/cache" + "github.com/kubewharf/katalyst-core/pkg/scheduler/eventhandlers" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +const ( + OvercommitPodHandler = "OvercommitPodHandler" + OvercommitCNRHandler = "OvercommitCNRHandler" +) + +// RegisterPodHandler register handler to scheduler event handlers +func RegisterPodHandler() { + eventhandlers.RegisterEventHandler(OvercommitPodHandler, func(informerFactory informers.SharedInformerFactory, _ externalversions.SharedInformerFactory) { + podInformer := informerFactory.Core().V1().Pods() + podInformer.Informer().AddEventHandler( + clientgocache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *v1.Pod: + return native.IsAssignedPod(t) + case clientgocache.DeletedFinalStateUnknown: + if _, ok := t.Obj.(*v1.Pod); ok { + // The carried object may be stale, so we don't use it to check if + // it's assigned or not. Attempting to cleanup anyways. + return true + } + utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj)) + return false + default: + utilruntime.HandleError(fmt.Errorf("unable to handle object: %T", obj)) + return false + } + }, + Handler: clientgocache.ResourceEventHandlerFuncs{ + AddFunc: addPod, + UpdateFunc: updatePod, + DeleteFunc: deletePod, + }, + }, + ) + }) +} + +// RegisterCNRHandler register handler to scheduler event handlers +func RegisterCNRHandler() { + eventhandlers.RegisterEventHandler(OvercommitCNRHandler, func(_ informers.SharedInformerFactory, internalInformerFactory externalversions.SharedInformerFactory) { + cnrInformer := internalInformerFactory.Node().V1alpha1().CustomNodeResources() + cnrInformer.Informer().AddEventHandler( + clientgocache.ResourceEventHandlerFuncs{ + AddFunc: addCNR, + UpdateFunc: updateCNR, + DeleteFunc: deleteCNR, + }) + }) +} + +func addPod(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj) + return + } + klog.V(6).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod)) + + if err := GetCache().AddPod(pod); err != nil { + klog.Errorf("%v cache AddPod failed, pod: %v, err: %v", OvercommitPodHandler, klog.KObj(pod), err) + } +} + +func updatePod(_, newObj interface{}) { + newPod, ok := newObj.(*v1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", newObj) + return + } + klog.V(6).InfoS("Add event for scheduled pod", "pod", klog.KObj(newPod)) + + if err := GetCache().AddPod(newPod); err != nil { + klog.Errorf("%v cache AddPod failed, pod: %v, err: %v", OvercommitPodHandler, klog.KObj(newPod), err) + } +} + +func deletePod(obj interface{}) { + var pod *v1.Pod + switch t := obj.(type) { + case *v1.Pod: + pod = t + case clientgocache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*v1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t.Obj) + return + } + default: + klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t) + return + } + klog.V(6).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod)) + + if err := GetCache().RemovePod(pod); err != nil { + klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod)) + } +} + +func addCNR(obj interface{}) { + cnr, ok := obj.(*v1alpha1.CustomNodeResource) + if !ok { + klog.Errorf("cannot convert obj to CNR: %v", obj) + return + } + + GetCache().AddOrUpdateCNR(cnr) +} + +func updateCNR(_, newObj interface{}) { + newCNR, ok := newObj.(*v1alpha1.CustomNodeResource) + if !ok { + klog.Errorf("cannot convert obj to CNR: %v", newObj) + return + } + + GetCache().AddOrUpdateCNR(newCNR) +} + +func deleteCNR(obj interface{}) { + var cnr *v1alpha1.CustomNodeResource + switch t := obj.(type) { + case *v1alpha1.CustomNodeResource: + cnr = t + case clientgocache.DeletedFinalStateUnknown: + var ok bool + cnr, ok = t.Obj.(*v1alpha1.CustomNodeResource) + if !ok { + klog.ErrorS(nil, "Cannot convert to *apis.CNR", "obj", t.Obj) + return + } + default: + klog.ErrorS(nil, "Cannot convert to *apis.CNR", "obj", t) + return + } + + schedulercache.GetCache().RemoveCNR(cnr) +} diff --git a/pkg/scheduler/plugins/nodeovercommitment/fit.go b/pkg/scheduler/plugins/nodeovercommitment/fit.go new file mode 100644 index 0000000000..7d4c989a16 --- /dev/null +++ b/pkg/scheduler/plugins/nodeovercommitment/fit.go @@ -0,0 +1,228 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeovercommitment + +import ( + "context" + "fmt" + "strconv" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/nodeovercommitment/cache" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +type preFilterState struct { + framework.Resource + GuaranteedCPUs int +} + +func (s *preFilterState) Clone() framework.StateData { + return s +} + +func (n *NodeOvercommitment) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + cycleState.Write(preFilterStateKey, computePodResourceRequest(pod)) + return nil, nil +} + +func (n *NodeOvercommitment) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) { + c, err := cycleState.Read(preFilterStateKey) + if err != nil { + return nil, fmt.Errorf("error reading %q from cycleState: %w", preFilterStateKey, err) + } + + s, ok := c.(*preFilterState) + if !ok { + return nil, fmt.Errorf("%+v convert to NodeOvercommitment.preFilterState error", c) + } + return s, nil +} + +func (n *NodeOvercommitment) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + if nodeInfo.Node() == nil { + return framework.NewStatus(framework.Error, "node not found") + } + + s, err := getPreFilterState(cycleState) + if err != nil { + klog.Error(err) + return framework.AsStatus(err) + } + if s.GuaranteedCPUs == 0 && s.MilliCPU == 0 { + return nil + } + + var nodeName = nodeInfo.Node().GetName() + nodeCache, err := cache.GetCache().GetNode(nodeName) + if err != nil { + err := fmt.Errorf("GetNodeInfo %s from cache fail: %v", nodeName, err) + klog.Error(err) + return framework.NewStatus(framework.Error, err.Error()) + } + + CPUManagerAvailable, memoryManagerAvailable := nodeCache.HintProvidersAvailable() + CPUOvercommitRatio, memoryOvercommitRatio, err := n.nodeOvercommitRatio(nodeInfo) + if err != nil { + klog.Error(err) + return framework.NewStatus(framework.Error, err.Error()) + } + + if memoryManagerAvailable { + if memoryOvercommitRatio > 1.0 { + err = fmt.Errorf("node %v memoryManager and memoryOvercommit both available", nodeName) + klog.Error(err) + return framework.NewStatus(framework.Unschedulable, err.Error()) + } + } + + if CPUManagerAvailable && CPUOvercommitRatio > 1.0 { + nodeCPUOriginAllocatable, err := n.nodeCPUAllocatable(nodeInfo) + if err != nil { + klog.Error(err) + return framework.NewStatus(framework.Error, err.Error()) + } + + guaranteedCPUs := resource.NewQuantity(int64(nodeCache.GetGuaranteedCPUs()), resource.DecimalSI) + nonGuaranteedRequestCPU := nodeInfo.Requested.MilliCPU - guaranteedCPUs.MilliValue() + + nodeCPUOriginAllocatable.Sub(*guaranteedCPUs) + *nodeCPUOriginAllocatable = native.MultiplyResourceQuantity(v1.ResourceCPU, *nodeCPUOriginAllocatable, CPUOvercommitRatio) + + klog.V(5).Infof("nodeOvercommitment, pod guranteedCPUs: %v, pod cpus: %v, CPUOvercommitRatio: %v, nodeAllocatable: %v, guaranteedCPUs: %v, nonGuaranteedRequestCPu: %v", + s.GuaranteedCPUs, s.MilliCPU, CPUOvercommitRatio, nodeCPUOriginAllocatable.MilliValue(), guaranteedCPUs.MilliValue(), nonGuaranteedRequestCPU) + + if s.GuaranteedCPUs > 0 { + if int64(float64(s.GuaranteedCPUs)*1000.0*CPUOvercommitRatio) > nodeCPUOriginAllocatable.MilliValue()-nonGuaranteedRequestCPU { + return framework.NewStatus(framework.Unschedulable, "node overcommitment insufficient cpu") + } + } else { + if s.MilliCPU > nodeCPUOriginAllocatable.MilliValue()-nonGuaranteedRequestCPU { + return framework.NewStatus(framework.Unschedulable, "node overcommitment insufficient cpu") + } + } + } + + return nil +} + +func (n *NodeOvercommitment) nodeOvercommitRatio(nodeInfo *framework.NodeInfo) (CPUOvercommitRatio, memoryOvercommitRatio float64, err error) { + CPUOvercommitRatio, memoryOvercommitRatio = 1.0, 1.0 + + if nodeInfo.Node() == nil || nodeInfo.Node().GetAnnotations() == nil { + return + } + + var ( + annotation = nodeInfo.Node().GetAnnotations() + ) + CPUOvercommitAnnotation, ok := annotation[consts.NodeAnnotationCPUOvercommitRatioKey] + if ok { + CPUOvercommitRatio, err = strconv.ParseFloat(CPUOvercommitAnnotation, 64) + if err != nil { + klog.Error(err) + return + } + } + realtimeCPUOvercommitAnnotation, ok := annotation[consts.NodeAnnotationRealtimeCPUOvercommitRatioKey] + if ok { + realtimeCPUOvercommitRatio, err := strconv.ParseFloat(realtimeCPUOvercommitAnnotation, 64) + if err != nil { + klog.Error(err) + } else { + if realtimeCPUOvercommitRatio < CPUOvercommitRatio && realtimeCPUOvercommitRatio >= 1 { + CPUOvercommitRatio = realtimeCPUOvercommitRatio + } + } + } + + memoryOvercommitAnnotation, ok := annotation[consts.NodeAnnotationMemoryOvercommitRatioKey] + if ok { + memoryOvercommitRatio, err = strconv.ParseFloat(memoryOvercommitAnnotation, 64) + if err != nil { + klog.Error(err) + return + } + } + + realtimeMemoryOvercommitAnnotation, ok := annotation[consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey] + if ok { + realtimeMemoryOvercommitRatio, err := strconv.ParseFloat(realtimeMemoryOvercommitAnnotation, 64) + if err != nil { + klog.Error(err) + } else { + if realtimeMemoryOvercommitRatio < memoryOvercommitRatio && realtimeMemoryOvercommitRatio >= 1 { + memoryOvercommitRatio = realtimeMemoryOvercommitRatio + } + } + } + + return +} + +func computePodResourceRequest(pod *v1.Pod) *preFilterState { + result := &preFilterState{} + + CPUs := native.PodGuaranteedCPUs(pod) + if CPUs > 0 { + result.GuaranteedCPUs = CPUs + return result + } + + for _, container := range pod.Spec.Containers { + result.Add(container.Resources.Requests) + } + + // take max_resource(sum_pod, any_init_container) + for _, container := range pod.Spec.InitContainers { + result.SetMaxResource(container.Resources.Requests) + } + + // If Overhead is being utilized, add to the total requests for the pod + if pod.Spec.Overhead != nil { + result.Add(pod.Spec.Overhead) + } + return result +} + +func (n *NodeOvercommitment) nodeCPUAllocatable(nodeInfo *framework.NodeInfo) (*resource.Quantity, error) { + node := nodeInfo.Node() + if node == nil { + return nil, fmt.Errorf("nil nodeInfo") + } + + if node.GetAnnotations() == nil { + return node.Status.Allocatable.Cpu(), nil + } + + originalAllocatableCPU, ok := node.Annotations[consts.NodeAnnotationOriginalAllocatableCPUKey] + if !ok { + return node.Status.Allocatable.Cpu(), nil + } + + quantity, err := resource.ParseQuantity(originalAllocatableCPU) + return &quantity, err +} diff --git a/pkg/scheduler/plugins/nodeovercommitment/fit_test.go b/pkg/scheduler/plugins/nodeovercommitment/fit_test.go new file mode 100644 index 0000000000..8c9b7d384d --- /dev/null +++ b/pkg/scheduler/plugins/nodeovercommitment/fit_test.go @@ -0,0 +1,615 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeovercommitment + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/nodeovercommitment/cache" +) + +func TestPreFilter(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + pod *v1.Pod + expectCPU int64 + expectGuaranteedCPU int + }{ + { + name: "burstable pod", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + }, + expectCPU: 2000, + expectGuaranteedCPU: 0, + }, + { + name: "guaranteed pod", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + expectCPU: 0, + expectGuaranteedCPU: 4, + }, + { + name: "guaranteed pod with init container", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + expectCPU: 0, + expectGuaranteedCPU: 4, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + n := &NodeOvercommitment{} + cs := framework.NewCycleState() + res, stat := n.PreFilter(context.TODO(), cs, tc.pod) + assert.Nil(t, res) + assert.Nil(t, stat) + + fs, err := getPreFilterState(cs) + assert.NoError(t, err) + assert.Equal(t, tc.expectGuaranteedCPU, fs.GuaranteedCPUs) + assert.Equal(t, tc.expectCPU, fs.MilliCPU) + }) + } +} + +func TestFilter(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + node *v1.Node + cnrs []*v1alpha1.CustomNodeResource + existPods []*v1.Pod + pod *v1.Pod + requested *framework.Resource + expectRes *framework.Status + }{ + { + name: "node cpumanager off", + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/cpu_overcommit_ratio": "2.0", + "katalyst.kubewharf.io/memory_overcommit_ratio": "1.0", + "katalyst.kubewharf.io/original_allocatable_cpu": "16000", + }, + }, + Status: v1.NodeStatus{ + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + cnrs: []*v1alpha1.CustomNodeResource{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "none", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "0", + }, + }, + }, + }, + existPods: []*v1.Pod{}, + requested: &framework.Resource{ + MilliCPU: 24000, + }, + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + expectRes: nil, + }, + { + name: "node cpumanager on", + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/cpu_overcommit_ratio": "2.0", + "katalyst.kubewharf.io/memory_overcommit_ratio": "1.0", + "katalyst.kubewharf.io/original_allocatable_cpu": "16000", + }, + }, + Status: v1.NodeStatus{ + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + cnrs: []*v1alpha1.CustomNodeResource{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "static", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "4", + }, + }, + }, + }, + requested: &framework.Resource{ + MilliCPU: 24000, + }, + existPods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod01", + UID: "pod01", + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + }, + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + expectRes: nil, + }, + { + name: "node cpumanager on with recommend", + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/cpu_overcommit_ratio": "3.0", + "katalyst.kubewharf.io/memory_overcommit_ratio": "1.0", + "katalyst.kubewharf.io/recommend_cpu_overcommit_ratio": "2.0", + "katalyst.kubewharf.io/original_allocatable_cpu": "16000", + }, + }, + Status: v1.NodeStatus{ + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + cnrs: []*v1alpha1.CustomNodeResource{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "static", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "4", + }, + }, + }, + }, + requested: &framework.Resource{ + MilliCPU: 24000, + }, + existPods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod01", + UID: "pod01", + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + }, + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + expectRes: nil, + }, + { + name: "node cpumanager on 2", + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/cpu_overcommit_ratio": "2.0", + "katalyst.kubewharf.io/memory_overcommit_ratio": "1.0", + "katalyst.kubewharf.io/original_allocatable_cpu": "16000m", + }, + }, + Status: v1.NodeStatus{ + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + cnrs: []*v1alpha1.CustomNodeResource{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "static", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "8", + }, + }, + }, + }, + requested: &framework.Resource{ + MilliCPU: 24000, + }, + existPods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod01", + UID: "pod01", + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + }, + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + expectRes: framework.NewStatus(framework.Unschedulable), + }, + { + name: "node cpumanager on 3", + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/cpu_overcommit_ratio": "2.0", + "katalyst.kubewharf.io/memory_overcommit_ratio": "1.0", + "katalyst.kubewharf.io/original_allocatable_cpu": "16000m", + }, + }, + Status: v1.NodeStatus{ + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + cnrs: []*v1alpha1.CustomNodeResource{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "static", + "katalyst.kubewharf.io/overcommit_memory_manager": "None", + "katalyst.kubewharf.io/guaranteed_cpus": "4", + }, + }, + }, + }, + requested: &framework.Resource{ + MilliCPU: 24000, + }, + existPods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod01", + UID: "pod01", + }, + Spec: v1.PodSpec{ + NodeName: "node1", + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + }, + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + expectRes: framework.NewStatus(framework.Unschedulable), + }, + { + name: "node memorymanager on, overcommit not allowed", + node: &v1.Node{ + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/cpu_overcommit_ratio": "2.0", + "katalyst.kubewharf.io/memory_overcommit_ratio": "1.5", + "katalyst.kubewharf.io/original_allocatable_cpu": "16000", + }, + }, + Status: v1.NodeStatus{ + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + cnrs: []*v1alpha1.CustomNodeResource{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "katalyst.kubewharf.io/overcommit_cpu_manager": "none", + "katalyst.kubewharf.io/overcommit_memory_manager": "Static", + "katalyst.kubewharf.io/guaranteed_cpus": "0", + }, + }, + }, + }, + requested: &framework.Resource{ + MilliCPU: 0, + }, + existPods: []*v1.Pod{}, + pod: &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + expectRes: framework.NewStatus(framework.Unschedulable), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cnrs := tc.cnrs + for _, cnr := range cnrs { + cache.GetCache().AddOrUpdateCNR(cnr) + } + for _, pod := range tc.existPods { + cache.GetCache().AddPod(pod) + } + defer func() { + for _, cnr := range cnrs { + cache.GetCache().RemoveCNR(cnr) + } + }() + + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(tc.node) + nodeInfo.Requested = tc.requested + + n := &NodeOvercommitment{} + cs := framework.NewCycleState() + res, stat := n.PreFilter(context.TODO(), cs, tc.pod) + assert.Nil(t, res) + assert.Nil(t, stat) + + status := n.Filter(context.TODO(), cs, tc.pod, nodeInfo) + assert.Equal(t, tc.expectRes.Code(), status.Code()) + }) + } +} diff --git a/pkg/scheduler/plugins/nodeovercommitment/plugin.go b/pkg/scheduler/plugins/nodeovercommitment/plugin.go new file mode 100644 index 0000000000..63801f61ce --- /dev/null +++ b/pkg/scheduler/plugins/nodeovercommitment/plugin.go @@ -0,0 +1,52 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeovercommitment + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/nodeovercommitment/cache" +) + +var _ framework.FilterPlugin = &NodeOvercommitment{} + +const ( + // Name is the name of the plugin used in the plugin registry and configurations. + Name = "NodeOvercommitment" + + preFilterStateKey = "PreFilter" + Name +) + +var _ framework.PreFilterPlugin = &NodeOvercommitment{} +var _ framework.FilterPlugin = &NodeOvercommitment{} +var _ framework.ReservePlugin = &NodeOvercommitment{} + +type NodeOvercommitment struct{} + +func (n *NodeOvercommitment) Name() string { + return Name +} + +func New(args runtime.Object, h framework.Handle) (framework.Plugin, error) { + klog.Info("Creating new NodeOvercommitment plugin") + + cache.RegisterPodHandler() + cache.RegisterCNRHandler() + return &NodeOvercommitment{}, nil +} diff --git a/pkg/scheduler/plugins/nodeovercommitment/plugin_test.go b/pkg/scheduler/plugins/nodeovercommitment/plugin_test.go new file mode 100644 index 0000000000..7024039a91 --- /dev/null +++ b/pkg/scheduler/plugins/nodeovercommitment/plugin_test.go @@ -0,0 +1,30 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeovercommitment + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNew(t *testing.T) { + n, err := New(nil, nil) + assert.NoError(t, err) + + assert.Equal(t, Name, n.Name()) +} diff --git a/pkg/scheduler/plugins/nodeovercommitment/reserve.go b/pkg/scheduler/plugins/nodeovercommitment/reserve.go new file mode 100644 index 0000000000..d5dbd3ac7f --- /dev/null +++ b/pkg/scheduler/plugins/nodeovercommitment/reserve.go @@ -0,0 +1,42 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeovercommitment + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/nodeovercommitment/cache" +) + +func (n *NodeOvercommitment) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { + podCopy := pod.DeepCopy() + podCopy.Spec.NodeName = nodeName + + cache.GetCache().AddPod(podCopy) + + return framework.NewStatus(framework.Success, "") +} + +func (n *NodeOvercommitment) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { + podCopy := pod.DeepCopy() + podCopy.Spec.NodeName = nodeName + + cache.GetCache().RemovePod(pod) +} diff --git a/pkg/scheduler/plugins/nodeovercommitment/reserve_test.go b/pkg/scheduler/plugins/nodeovercommitment/reserve_test.go new file mode 100644 index 0000000000..e5f5e561fa --- /dev/null +++ b/pkg/scheduler/plugins/nodeovercommitment/reserve_test.go @@ -0,0 +1,189 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeovercommitment + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/nodeovercommitment/cache" +) + +func TestReserve(t *testing.T) { + testCases := []struct { + name string + pods []*v1.Pod + nodeName string + expectRes int + }{ + { + name: "case 1", + nodeName: "testNode", + pods: []*v1.Pod{ + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod1", + UID: "poduid1", + }, + Spec: v1.PodSpec{ + NodeName: "testNode", + Containers: []v1.Container{ + { + Name: "testContainer", + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod2", + UID: "poduid2", + }, + Spec: v1.PodSpec{ + NodeName: "testNode", + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + { + Name: "testContainer2", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod3", + UID: "poduid3", + }, + Spec: v1.PodSpec{ + NodeName: "testNode", + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + { + ObjectMeta: v12.ObjectMeta{ + Name: "pod4", + UID: "poduid4", + }, + Spec: v1.PodSpec{ + NodeName: "testNode", + InitContainers: []v1.Container{ + { + Name: "initContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("2Gi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + Containers: []v1.Container{ + { + Name: "testContainer", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("4"), + v1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + }, + }, + expectRes: 8, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + n := &NodeOvercommitment{} + cs := framework.NewCycleState() + + for _, pod := range tc.pods { + n.Reserve(context.TODO(), cs, pod, tc.nodeName) + } + + nodeCache, err := cache.GetCache().GetNode(tc.nodeName) + assert.NoError(t, err) + assert.Equal(t, tc.expectRes, nodeCache.GetGuaranteedCPUs()) + + for _, pod := range tc.pods { + err = cache.GetCache().AddPod(pod) + assert.NoError(t, err) + } + + assert.Equal(t, tc.expectRes, nodeCache.GetGuaranteedCPUs()) + for _, pod := range tc.pods { + err = cache.GetCache().RemovePod(pod) + assert.NoError(t, err) + } + + for _, pod := range tc.pods { + n.Reserve(context.TODO(), cs, pod, tc.nodeName) + n.Unreserve(context.TODO(), cs, pod, tc.nodeName) + } + assert.Equal(t, 0, nodeCache.GetGuaranteedCPUs()) + }) + } +} diff --git a/pkg/scheduler/plugins/noderesourcetopology/plugin.go b/pkg/scheduler/plugins/noderesourcetopology/plugin.go index 4b0718d7cb..f58e98a2ca 100644 --- a/pkg/scheduler/plugins/noderesourcetopology/plugin.go +++ b/pkg/scheduler/plugins/noderesourcetopology/plugin.go @@ -34,6 +34,7 @@ import ( apisconfig "github.com/kubewharf/katalyst-api/pkg/apis/scheduling/config" "github.com/kubewharf/katalyst-api/pkg/apis/scheduling/config/validation" "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/scheduler/eventhandlers" "github.com/kubewharf/katalyst-core/pkg/scheduler/util" "github.com/kubewharf/katalyst-core/pkg/util/native" ) @@ -103,6 +104,9 @@ func New(args runtime.Object, h framework.Handle) (framework.Plugin, error) { return nil, err } + eventhandlers.RegisterCommonPodHandler() + eventhandlers.RegisterCommonCNRHandler() + return &TopologyMatch{ scoreStrategyType: tcfg.ScoringStrategy.Type, alignedResources: alignedResources, diff --git a/pkg/scheduler/plugins/qosawarenoderesources/balanced_allocation.go b/pkg/scheduler/plugins/qosawarenoderesources/balanced_allocation.go index 95bddd3cdb..2d880c995f 100644 --- a/pkg/scheduler/plugins/qosawarenoderesources/balanced_allocation.go +++ b/pkg/scheduler/plugins/qosawarenoderesources/balanced_allocation.go @@ -31,6 +31,7 @@ import ( "github.com/kubewharf/katalyst-api/pkg/apis/scheduling/config" "github.com/kubewharf/katalyst-api/pkg/apis/scheduling/config/validation" "github.com/kubewharf/katalyst-core/pkg/scheduler/cache" + "github.com/kubewharf/katalyst-core/pkg/scheduler/eventhandlers" "github.com/kubewharf/katalyst-core/pkg/scheduler/util" ) @@ -97,6 +98,9 @@ func NewBalancedAllocation(baArgs runtime.Object, h framework.Handle) (framework return nil, err } + eventhandlers.RegisterCommonPodHandler() + eventhandlers.RegisterCommonCNRHandler() + return &BalancedAllocation{ handle: h, resourceAllocationScorer: resourceAllocationScorer{ diff --git a/pkg/scheduler/plugins/qosawarenoderesources/fit.go b/pkg/scheduler/plugins/qosawarenoderesources/fit.go index 83c0e6bf58..3fa350a29e 100644 --- a/pkg/scheduler/plugins/qosawarenoderesources/fit.go +++ b/pkg/scheduler/plugins/qosawarenoderesources/fit.go @@ -32,6 +32,7 @@ import ( "github.com/kubewharf/katalyst-api/pkg/apis/scheduling/config/validation" "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/scheduler/cache" + "github.com/kubewharf/katalyst-core/pkg/scheduler/eventhandlers" "github.com/kubewharf/katalyst-core/pkg/scheduler/util" "github.com/kubewharf/katalyst-core/pkg/util/native" ) @@ -131,6 +132,9 @@ func NewFit(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) return nil, err } + eventhandlers.RegisterCommonPodHandler() + eventhandlers.RegisterCommonCNRHandler() + return &Fit{ handle: h, resourceAllocationScorer: *scorePlugin(args), diff --git a/pkg/util/cnr.go b/pkg/util/cnr.go index e8ca7d9175..4415582719 100644 --- a/pkg/util/cnr.go +++ b/pkg/util/cnr.go @@ -43,6 +43,7 @@ const ( CNRFieldNameResources = "Resources" CNRFieldNameTopologyPolicy = "TopologyPolicy" CNRFieldNameNodeMetricStatus = "NodeMetricStatus" + CNRFieldNameAnnotations = "Annotations" ) var ( diff --git a/pkg/util/native/pod_resource.go b/pkg/util/native/pod_resource.go index 4b2775f8c5..4e325fe7df 100644 --- a/pkg/util/native/pod_resource.go +++ b/pkg/util/native/pod_resource.go @@ -19,6 +19,7 @@ package native import ( v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" ) // PodResource key: namespace/name, value: pod requested ResourceList @@ -70,3 +71,42 @@ func CalculateResource(pod *v1.Pod) v1.ResourceList { } return resources } + +func PodGuaranteedCPUs(pod *v1.Pod) int { + // The maximum of requested CPUs by init containers. + requestedByInitContainers := 0 + for _, container := range pod.Spec.InitContainers { + if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok { + continue + } + requestedCPU := guaranteedCPUs(pod, &container) + if requestedCPU > requestedByInitContainers { + requestedByInitContainers = requestedCPU + } + } + // The sum of requested CPUs by app containers. + requestedByAppContainers := 0 + for _, container := range pod.Spec.Containers { + if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok { + continue + } + requestedByAppContainers += guaranteedCPUs(pod, &container) + } + + if requestedByInitContainers > requestedByAppContainers { + return requestedByInitContainers + } + return requestedByAppContainers +} + +func guaranteedCPUs(pod *v1.Pod, container *v1.Container) int { + if qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { + return 0 + } + cpuQuantity := container.Resources.Requests[v1.ResourceCPU] + if cpuQuantity.Value()*1000 != cpuQuantity.MilliValue() { + return 0 + } + + return int(cpuQuantity.Value()) +} diff --git a/pkg/util/native/resources_test.go b/pkg/util/native/resources_test.go index 2808f2cf0b..75a5cd5450 100644 --- a/pkg/util/native/resources_test.go +++ b/pkg/util/native/resources_test.go @@ -160,13 +160,29 @@ func TestMultiplyResourceQuantity(t *testing.T) { want: true, }, { - name: "resource memory", - resourceName: v1.ResourceMemory, + name: "resource memory Gi", + resourceName: v1.ResourceCPU, quant: resource.MustParse("200Gi"), factor: 1.5, res: resource.MustParse("300Gi"), want: true, }, + { + name: "resource memory ki", + resourceName: v1.ResourceMemory, + quant: resource.MustParse("32612508Ki"), + factor: 1.5, + res: resource.MustParse("48918762Ki"), + want: true, + }, + { + name: "resource memory m", + resourceName: v1.ResourceMemory, + quant: resource.MustParse("29258114498560m"), + factor: 1.5, + res: resource.MustParse("43887171747840m"), + want: true, + }, { name: "zero value", resourceName: v1.ResourceCPU, @@ -194,7 +210,8 @@ func TestMultiplyResourceQuantity(t *testing.T) { } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - quant := MultiplyResourceQuantity(c.resourceName, c.quant, c.factor) + quant := MultiplyMilliQuantity(c.quant, c.factor) + t.Log(quant.String()) assert.Equal(t, c.want, quant.Equal(c.res)) }) } diff --git a/pkg/webhook/mutating/node/allocatable_mutator.go b/pkg/webhook/mutating/node/allocatable_mutator.go index 85195328bf..b14af2dbf3 100644 --- a/pkg/webhook/mutating/node/allocatable_mutator.go +++ b/pkg/webhook/mutating/node/allocatable_mutator.go @@ -22,6 +22,7 @@ import ( admissionv1beta1 "k8s.io/api/admission/v1beta1" core "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog/v2" "github.com/kubewharf/katalyst-api/pkg/consts" @@ -63,45 +64,109 @@ func (na *WebhookNodeAllocatableMutator) MutateNode(node *core.Node, admissionRe CPUOvercommitRatioValue, ok := node.Annotations[consts.NodeAnnotationCPUOvercommitRatioKey] if ok { - CPUOvercommitRatio, err := overcommitRatioValidate(CPUOvercommitRatioValue) - if err != nil { - klog.Errorf("node %s %s validate fail, value: %s, err: %v", node.Name, consts.NodeAnnotationCPUOvercommitRatioKey, CPUOvercommitRatioValue, err) - } else { - if CPUOvercommitRatio > 1.0 { - allocatable := node.Status.Allocatable.Cpu() - capacity := node.Status.Capacity.Cpu() - newAllocatable := native.MultiplyResourceQuantity(core.ResourceCPU, *allocatable, CPUOvercommitRatio) - newCapacity := native.MultiplyResourceQuantity(core.ResourceCPU, *capacity, CPUOvercommitRatio) - klog.V(6).Infof( - "node %s %s capacity: %v, allocatable: %v, newCapacity: %v, newAllocatable: %v", - node.Name, core.ResourceCPU, - capacity.String(), newCapacity.String(), - allocatable.String(), newAllocatable.String()) - node.Status.Allocatable[core.ResourceCPU] = newAllocatable - node.Status.Capacity[core.ResourceCPU] = newCapacity + var newAllocatable, newCapacity *resource.Quantity + // get overcommit allocatable and capacity from annotation first + overcommitCapacity, ok := node.Annotations[consts.NodeAnnotationOvercommitCapacityCPUKey] + if ok { + quantity, err := resource.ParseQuantity(overcommitCapacity) + if err != nil { + klog.Error(err) + } else { + newCapacity = &quantity + klog.V(6).Infof("node %s cpu capacity by annotation: %v", node.Name, newCapacity.String()) } } + + overcommitAllocatable, ok := node.Annotations[consts.NodeAnnotationOvercommitAllocatableCPUKey] + if ok { + quantity, err := resource.ParseQuantity(overcommitAllocatable) + if err != nil { + klog.Error(err) + } else { + newAllocatable = &quantity + klog.V(6).Infof("node %s cpu allocatable by annotation: %v", node.Name, newAllocatable.String()) + } + } + + // calculate allocatable and capacity by overcommit ratio + if newAllocatable == nil || newCapacity == nil { + CPUOvercommitRatio, err := overcommitRatioValidate(CPUOvercommitRatioValue, node.Annotations[consts.NodeAnnotationRealtimeCPUOvercommitRatioKey]) + if err != nil { + klog.Errorf("node %s %s validate fail, value: %s, err: %v", node.Name, consts.NodeAnnotationCPUOvercommitRatioKey, CPUOvercommitRatioValue, err) + } else { + if CPUOvercommitRatio > 1.0 { + allocatable := node.Status.Allocatable.Cpu() + capacity := node.Status.Capacity.Cpu() + allocatableByOvercommit := native.MultiplyResourceQuantity(core.ResourceCPU, *allocatable, CPUOvercommitRatio) + capacityByOvercommit := native.MultiplyResourceQuantity(core.ResourceCPU, *capacity, CPUOvercommitRatio) + newAllocatable = &allocatableByOvercommit + newCapacity = &capacityByOvercommit + + klog.V(6).Infof( + "node %s %s capacity: %v, allocatable: %v, newCapacity: %v, newAllocatable: %v", + node.Name, core.ResourceCPU, + capacity.String(), newCapacity.String(), + allocatable.String(), newAllocatable.String()) + } + } + } + + if newAllocatable != nil && newCapacity != nil { + node.Status.Allocatable[core.ResourceCPU] = *newAllocatable + node.Status.Capacity[core.ResourceCPU] = *newCapacity + } } memoryOvercommitRatioValue, ok := node.Annotations[consts.NodeAnnotationMemoryOvercommitRatioKey] if ok { - memoryOvercommitRatio, err := overcommitRatioValidate(memoryOvercommitRatioValue) - if err != nil { - klog.Errorf("node %s %s validate fail, value: %s, err: %v", node.Name, consts.NodeAnnotationMemoryOvercommitRatioKey, memoryOvercommitRatioValue, err) - } else { - if memoryOvercommitRatio > 1.0 { - allocatable := node.Status.Allocatable.Memory() - capacity := node.Status.Capacity.Memory() - newAllocatable := native.MultiplyResourceQuantity(core.ResourceMemory, *allocatable, memoryOvercommitRatio) - newCapacity := native.MultiplyResourceQuantity(core.ResourceMemory, *capacity, memoryOvercommitRatio) - klog.V(6).Infof("node %s %s capacity: %v, allocatable: %v, newCapacity: %v, newAllocatable: %v", - node.Name, core.ResourceMemory, - capacity.String(), newCapacity.String(), - allocatable.String(), newAllocatable.String()) - node.Status.Allocatable[core.ResourceMemory] = newAllocatable - node.Status.Capacity[core.ResourceMemory] = newCapacity + var newAllocatable, newCapacity *resource.Quantity + // get overcommit allocatable and capacity from annotation first + overcommitCapacity, ok := node.Annotations[consts.NodeAnnotationOvercommitCapacityMemoryKey] + if ok { + quantity, err := resource.ParseQuantity(overcommitCapacity) + if err != nil { + klog.Error(err) + } else { + newCapacity = &quantity + klog.V(6).Infof("node %s mem capacity by annotation: %v", node.Name, newCapacity.String()) + } + } + + overcommitAllocatable, ok := node.Annotations[consts.NodeAnnotationOvercommitAllocatableMemoryKey] + if ok { + quantity, err := resource.ParseQuantity(overcommitAllocatable) + if err != nil { + klog.Error(err) + } else { + newAllocatable = &quantity + klog.V(6).Infof("node %s mem allocatable by annotation: %v", node.Name, newAllocatable.String()) + } + } + + if newAllocatable == nil || newCapacity == nil { + memoryOvercommitRatio, err := overcommitRatioValidate(memoryOvercommitRatioValue, node.Annotations[consts.NodeAnnotationRealtimeMemoryOvercommitRatioKey]) + if err != nil { + klog.Errorf("node %s %s validate fail, value: %s, err: %v", node.Name, consts.NodeAnnotationMemoryOvercommitRatioKey, memoryOvercommitRatioValue, err) + } else { + if memoryOvercommitRatio > 1.0 { + allocatable := node.Status.Allocatable.Memory() + capacity := node.Status.Capacity.Memory() + allocatableByOvercommit := native.MultiplyResourceQuantity(core.ResourceMemory, *allocatable, memoryOvercommitRatio) + capacityByOvercommit := native.MultiplyResourceQuantity(core.ResourceMemory, *capacity, memoryOvercommitRatio) + newAllocatable = &allocatableByOvercommit + newCapacity = &capacityByOvercommit + klog.V(6).Infof("node %s %s capacity: %v, allocatable: %v, newCapacity: %v, newAllocatable: %v", + node.Name, core.ResourceMemory, + capacity.String(), newCapacity.String(), + allocatable.String(), newAllocatable.String()) + } } } + + if newAllocatable != nil && newCapacity != nil { + node.Status.Allocatable[core.ResourceMemory] = *newAllocatable + node.Status.Capacity[core.ResourceMemory] = *newCapacity + } } return nil @@ -111,7 +176,7 @@ func (na *WebhookNodeAllocatableMutator) Name() string { return nodeAllocatableMutatorName } -func overcommitRatioValidate(overcommitRatioAnnotation string) (float64, error) { +func overcommitRatioValidate(overcommitRatioAnnotation string, recommendOvercommitRatioAnnotation string) (float64, error) { overcommitRatio, err := strconv.ParseFloat(overcommitRatioAnnotation, 64) if err != nil { return 1, err @@ -122,5 +187,19 @@ func overcommitRatioValidate(overcommitRatioAnnotation string) (float64, error) return 1, err } + if recommendOvercommitRatioAnnotation == "" { + return overcommitRatio, nil + } + + recommendOvercommitRatio, err := strconv.ParseFloat(recommendOvercommitRatioAnnotation, 64) + if err != nil { + klog.Error(err) + return overcommitRatio, nil + } + + if recommendOvercommitRatio >= 1.0 && recommendOvercommitRatio < overcommitRatio { + return recommendOvercommitRatio, nil + } + return overcommitRatio, nil } diff --git a/pkg/webhook/mutating/node/node_test.go b/pkg/webhook/mutating/node/node_test.go index 7ccf8717c9..7d4c9369a3 100644 --- a/pkg/webhook/mutating/node/node_test.go +++ b/pkg/webhook/mutating/node/node_test.go @@ -149,6 +149,54 @@ func TestMutateNode(t *testing.T) { }, }, } + node6 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node6", + Namespace: "default", + Annotations: map[string]string{ + apiconsts.NodeAnnotationCPUOvercommitRatioKey: "2", + apiconsts.NodeAnnotationMemoryOvercommitRatioKey: "1.2", + apiconsts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "1", + apiconsts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "1", + }, + }, + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(48, resource.DecimalSI), + v1.ResourceMemory: resource.MustParse("192Gi"), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(44, resource.DecimalSI), + v1.ResourceMemory: resource.MustParse("186Gi"), + }, + }, + } + node7 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node6", + Namespace: "default", + Annotations: map[string]string{ + apiconsts.NodeAnnotationCPUOvercommitRatioKey: "2", + apiconsts.NodeAnnotationMemoryOvercommitRatioKey: "1.2", + apiconsts.NodeAnnotationRealtimeCPUOvercommitRatioKey: "2", + apiconsts.NodeAnnotationRealtimeMemoryOvercommitRatioKey: "2", + apiconsts.NodeAnnotationOvercommitAllocatableCPUKey: "80", + apiconsts.NodeAnnotationOvercommitCapacityCPUKey: "80", + apiconsts.NodeAnnotationOvercommitAllocatableMemoryKey: "372Gi", + apiconsts.NodeAnnotationOvercommitCapacityMemoryKey: "384Gi", + }, + }, + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(48, resource.DecimalSI), + v1.ResourceMemory: resource.MustParse("192Gi"), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(44, resource.DecimalSI), + v1.ResourceMemory: resource.MustParse("186Gi"), + }, + }, + } cases := []struct { name string @@ -297,6 +345,50 @@ func TestMutateNode(t *testing.T) { }, allow: true, }, + { + name: "node with lower recommend", + review: &admissionv1beta1.AdmissionReview{ + Request: &admissionv1beta1.AdmissionRequest{ + UID: "case7", + Object: runtime.RawExtension{ + Raw: nodeToJson(node6), + }, + Operation: admissionv1beta1.Update, + SubResource: "status", + }, + }, + expPatch: []string{ + `{"op":"add","path":"/metadata/annotations/katalyst.kubewharf.io~1original_allocatable_cpu","value":"44"}`, + `{"op":"add","path":"/metadata/annotations/katalyst.kubewharf.io~1original_allocatable_memory","value":"186Gi"}`, + `{"op":"add","path":"/metadata/annotations/katalyst.kubewharf.io~1original_capacity_cpu","value":"48"}`, + `{"op":"add","path":"/metadata/annotations/katalyst.kubewharf.io~1original_capacity_memory","value":"192Gi"}`, + }, + allow: true, + }, + { + name: "node with allocatable", + review: &admissionv1beta1.AdmissionReview{ + Request: &admissionv1beta1.AdmissionRequest{ + UID: "case8", + Object: runtime.RawExtension{ + Raw: nodeToJson(node7), + }, + Operation: admissionv1beta1.Update, + SubResource: "status", + }, + }, + expPatch: []string{ + `{"op":"add","path":"/metadata/annotations/katalyst.kubewharf.io~1original_allocatable_cpu","value":"44"}`, + `{"op":"add","path":"/metadata/annotations/katalyst.kubewharf.io~1original_allocatable_memory","value":"186Gi"}`, + `{"op":"add","path":"/metadata/annotations/katalyst.kubewharf.io~1original_capacity_cpu","value":"48"}`, + `{"op":"add","path":"/metadata/annotations/katalyst.kubewharf.io~1original_capacity_memory","value":"192Gi"}`, + `{"op":"replace","path":"/status/allocatable/cpu","value":"80"}`, + `{"op":"replace","path":"/status/capacity/cpu","value":"80"}`, + `{"op":"replace","path":"/status/allocatable/memory","value":"372Gi"}`, + `{"op":"replace","path":"/status/capacity/memory","value":"384Gi"}`, + }, + allow: true, + }, } for _, c := range cases {