From c7d9d3fd84c2773f774b9a15971ab9d811644450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=81=A5=E4=BF=9E?= Date: Wed, 6 Mar 2024 17:46:00 +0800 Subject: [PATCH] fix(qrm): fix advisor drop cache handler missing nbytes parameter --- .../dynamicpolicy/policy_advisor_handler.go | 7 +- .../policy_allocation_handlers.go | 11 +-- .../qrm-plugins/memory/dynamicpolicy/util.go | 16 ++++ .../memory/dynamicpolicy/util_test.go | 79 +++++++++++++++++++ 4 files changed, 102 insertions(+), 11 deletions(-) create mode 100644 pkg/agent/qrm-plugins/memory/dynamicpolicy/util_test.go diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go index 222507a4b..957d63124 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go @@ -307,12 +307,17 @@ func (p *DynamicPolicy) handleAdvisorDropCache( return fmt.Errorf("get container id of pod: %s container: %s failed with error: %v", entryName, subEntryName, err) } + container, err := p.metaServer.GetContainerSpec(entryName, subEntryName) + if err != nil || container == nil { + return fmt.Errorf("get container spec for pod: %s, container: %s failed with error: %v", entryName, subEntryName, err) + } + dropCacheWorkName := util.GetContainerAsyncWorkName(entryName, subEntryName, memoryPluginAsyncWorkTopicDropCache) // start a asynchronous work to drop cache for the container whose numaset changed and doesn't require numa_binding err = p.asyncWorkers.AddWork(dropCacheWorkName, &asyncworker.Work{ Fn: cgroupmgr.DropCacheWithTimeoutForContainer, - Params: []interface{}{entryName, containerID, dropCacheTimeoutSeconds}, + Params: []interface{}{entryName, containerID, dropCacheTimeoutSeconds, GetFullyDropCacheBytes(container)}, DeliveredAt: time.Now()}, asyncworker.DuplicateWorkPolicyOverride) if err != nil { diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_allocation_handlers.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_allocation_handlers.go index 1a9f76c42..aeeab972f 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_allocation_handlers.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_allocation_handlers.go @@ -460,22 +460,13 @@ func (p *DynamicPolicy) adjustAllocationEntries() error { } } - memoryLimit := container.Resources.Limits[v1.ResourceMemory] - memoryReq := container.Resources.Requests[v1.ResourceMemory] - memoryLimitBytes := memoryLimit.Value() - memoryReqBytes := memoryReq.Value() - - if memoryLimitBytes == 0 { - memoryLimitBytes = memoryReqBytes - } - dropCacheWorkName := util.GetContainerAsyncWorkName(podUID, containerName, memoryPluginAsyncWorkTopicDropCache) // start a asynchronous work to drop cache for the container whose numaset changed and doesn't require numa_binding err = p.asyncWorkers.AddWork(dropCacheWorkName, &asyncworker.Work{ Fn: cgroupmgr.DropCacheWithTimeoutForContainer, - Params: []interface{}{podUID, containerID, dropCacheTimeoutSeconds, memoryLimitBytes}, + Params: []interface{}{podUID, containerID, dropCacheTimeoutSeconds, GetFullyDropCacheBytes(container)}, DeliveredAt: time.Now()}, asyncworker.DuplicateWorkPolicyOverride) if err != nil { diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/util.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/util.go index 2e8c71bab..a095c459c 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/util.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/util.go @@ -31,6 +31,22 @@ import ( "github.com/kubewharf/katalyst-core/pkg/util/general" ) +func GetFullyDropCacheBytes(container *v1.Container) int64 { + if container == nil { + return 0 + } + + memoryLimit := container.Resources.Limits[v1.ResourceMemory] + memoryReq := container.Resources.Requests[v1.ResourceMemory] + fullyDropCacheBytes := memoryLimit.Value() + + if fullyDropCacheBytes == 0 { + fullyDropCacheBytes = memoryReq.Value() + } + + return fullyDropCacheBytes +} + // GetReservedMemory is used to spread total reserved memories into per-numa level. // this reserve resource calculation logic should be kept in qrm, if advisor wants // to get this info, it should depend on the returned checkpoint (through cpu-server) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/util_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/util_test.go new file mode 100644 index 000000000..32decbc66 --- /dev/null +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/util_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicpolicy + +import ( + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" +) + +func TestGetFullyDropCacheBytes(t *testing.T) { + type args struct { + container *v1.Container + } + tests := []struct { + name string + args args + want int64 + }{ + { + name: "contaienr with both request and limit", + args: args{ + container: &v1.Container{ + Name: "c1", + Resources: v1.ResourceRequirements{ + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceMemory: resource.MustParse("3Gi"), + }, + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + want: 3221225472, + }, + { + name: "contaienr only with request", + args: args{ + container: &v1.Container{ + Name: "c1", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + }, + want: 2147483648, + }, + { + name: "nil container", + args: args{}, + want: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GetFullyDropCacheBytes(tt.args.container); got != tt.want { + t.Errorf("GetFullyDropCacheBytes() = %v, want %v", got, tt.want) + } + }) + } +}