From 845317493603236e475b12c32d28c4a7c3cbd1b7 Mon Sep 17 00:00:00 2001 From: lilianrong Date: Thu, 29 Aug 2024 22:17:29 +0800 Subject: [PATCH 1/2] feat(qrm): fix cpu admit failed and optimize get cpu requested cores --- .../qrm-plugins/cpu/dynamicpolicy/policy.go | 21 +----- .../dynamicpolicy/policy_advisor_handler.go | 2 +- .../policy_allocation_handlers.go | 14 ++-- .../cpu/dynamicpolicy/policy_hint_handlers.go | 8 +-- .../cpu/dynamicpolicy/policy_test.go | 5 +- .../cpu/dynamicpolicy/state/util.go | 65 ++++++++++--------- .../cpu/dynamicpolicy/state/util_test.go | 21 ++++-- .../qrm-plugins/cpu/dynamicpolicy/vpa_test.go | 2 +- .../qrm-plugins/cpu/nativepolicy/policy.go | 2 - .../cpu/nativepolicy/policy_test.go | 2 - .../memory/dynamicpolicy/vpa_test.go | 2 +- 11 files changed, 69 insertions(+), 75 deletions(-) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go index 2948f2280..05f62be1b 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go @@ -19,7 +19,6 @@ package dynamicpolicy import ( "context" "fmt" - "math" "sync" "time" @@ -222,8 +221,6 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresHintHandler, } - state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores) - if err := policyImplement.cleanPools(); err != nil { return false, agent.ComponentStub{}, fmt.Errorf("cleanPools failed with error: %v", err) } @@ -1181,28 +1178,12 @@ func (p *DynamicPolicy) checkNormalShareCoresCpuResource(req *pluginapi.Resource return false, fmt.Errorf("GetQuantityFromResourceReq failed with error: %v", err) } - shareCoresAllocated := reqFloat64 - podEntries := p.state.GetPodEntries() - for podUid, podEntry := range podEntries { - if podEntry.IsPoolEntry() { - continue - } - if podUid == req.PodUid { - continue - } - for _, allocation := range podEntry { - // shareCoresAllocated should involve both main and sidecar containers - if state.CheckShared(allocation) && !state.CheckNUMABinding(allocation) { - shareCoresAllocated += p.getContainerRequestedCores(allocation) - } - } - } + shareCoresAllocatedInt := state.GetNonBindingSharedRequestedQuantityFromPodEntries(p.state.GetPodEntries(), map[string]float64{req.PodUid: reqFloat64}, p.getContainerRequestedCores) machineState := p.state.GetMachineState() pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, state.CheckDedicated, state.CheckNUMABinding) - shareCoresAllocatedInt := int(math.Ceil(shareCoresAllocated)) general.Infof("[checkNormalShareCoresCpuResource] node cpu allocated: %d, allocatable: %d", shareCoresAllocatedInt, pooledCPUs.Size()) if shareCoresAllocatedInt > pooledCPUs.Size() { general.Warningf("[checkNormalShareCoresCpuResource] no enough cpu resource for normal share cores pod: %s/%s, container: %s (request: %.02f, node allocated: %d, node allocatable: %d)", diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go index baef8e7e4..a27acee3c 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go @@ -582,7 +582,7 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad } newEntries[podUID][containerName] = allocationInfo.Clone() // adapt to old checkpoint without RequestQuantity property - newEntries[podUID][containerName].RequestQuantity = state.GetContainerRequestedCores()(allocationInfo) + newEntries[podUID][containerName].RequestQuantity = p.getContainerRequestedCores(allocationInfo) switch allocationInfo.QoSLevel { case consts.PodAnnotationQoSLevelDedicatedCores: diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_allocation_handlers.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_allocation_handlers.go index c9d78deca..de516f317 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_allocation_handlers.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_allocation_handlers.go @@ -726,7 +726,7 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesResizeAware(orig return fmt.Errorf("pool %s cross NUMA: %+v", poolName, poolsQuantityMap[poolName]) } } else if incrByReq { - err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap) + err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap, p.getContainerRequestedCores) if err != nil { return fmt.Errorf("CountAllocationInfosToPoolsQuantityMap failed with error: %v", err) } @@ -734,7 +734,7 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesResizeAware(orig } else { // else we do sum(containers req) for each pool to get pools ratio var err error - poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, allocationInfos) + poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, allocationInfos, p.getContainerRequestedCores) if err != nil { return fmt.Errorf("GetSharedQuantityMapFromPodEntries failed with error: %v", err) } @@ -745,14 +745,14 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesResizeAware(orig allocationInfos[0].PodNamespace, allocationInfos[0].PodName, allocationInfos[0].ContainerName) } // if advisor is disabled, qrm can re-calc the pool size exactly. we don't need to adjust the pool size. - err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap) + err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap, p.getContainerRequestedCores) if err != nil { return fmt.Errorf("CountAllocationInfosToPoolsQuantityMap failed with error: %v", err) } } } - isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, allocationInfos) + isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, allocationInfos, p.getContainerRequestedCores) err := p.adjustPoolsAndIsolatedEntries(poolsQuantityMap, isolatedQuantityMap, entries, machineState) if err != nil { @@ -867,12 +867,12 @@ func (p *DynamicPolicy) adjustAllocationEntries() error { poolsQuantityMap = machine.ParseCPUAssignmentQuantityMap(poolsCPUSetMap) } else { var err error - poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, nil) + poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, nil, p.getContainerRequestedCores) if err != nil { return fmt.Errorf("GetSharedQuantityMapFromPodEntries failed with error: %v", err) } } - isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, nil) + isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, nil, p.getContainerRequestedCores) err := p.adjustPoolsAndIsolatedEntries(poolsQuantityMap, isolatedQuantityMap, entries, machineState) if err != nil { @@ -1095,7 +1095,7 @@ func (p *DynamicPolicy) applyPoolsAndIsolatedInfo(poolsCPUSet map[string]machine newPodEntries[podUID][containerName] = allocationInfo.Clone() // adapt to old checkpoint without RequestQuantity property - newPodEntries[podUID][containerName].RequestQuantity = state.GetContainerRequestedCores()(allocationInfo) + newPodEntries[podUID][containerName].RequestQuantity = p.getContainerRequestedCores(allocationInfo) switch allocationInfo.QoSLevel { case apiconsts.PodAnnotationQoSLevelDedicatedCores: newPodEntries[podUID][containerName].OwnerPoolName = allocationInfo.GetPoolName() diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go index bc1c8ea43..57bfea0ef 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go @@ -303,7 +303,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int, }) if numaBound > machine.MBWNUMAsPoint { - numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer) + numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer, p.getContainerRequestedCores) general.InfoS("getNUMAAllocatedMemBW", "podNamespace", req.PodNamespace, @@ -322,7 +322,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int, return hints, nil } -func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserver.MetaServer) (map[int]int, error) { +func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserver.MetaServer, getContainerRequestedCores state.GetContainerRequestedCoresFunc) (map[int]int, error) { numaAllocatedMemBW := make(map[int]int) podUIDToMemBWReq := make(map[string]int) podUIDToBindingNUMAs := make(map[string]sets.Int) @@ -350,7 +350,7 @@ func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserve Name: allocationInfo.PodName, Labels: allocationInfo.Labels, Annotations: allocationInfo.Annotations, - }, int(math.Ceil(state.GetContainerRequestedCores()(allocationInfo)))) + }, int(math.Ceil(getContainerRequestedCores(allocationInfo)))) if err != nil { return nil, fmt.Errorf("GetContainerMemoryBandwidthRequest for pod: %s/%s, container: %s failed with error: %v", allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, err) @@ -784,7 +784,7 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE ) (map[string]*pluginapi.ListOfTopologyHints, error) { nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckNUMABinding).Size() nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding) - nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries) + nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries, nil, p.getContainerRequestedCores) numaNodes := p.filterNUMANodesByNonBindingSharedRequestedQuantity(nonBindingSharedRequestedQuantity, nonBindingNUMAsCPUQuantity, nonBindingNUMAs, machineState, diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index da68acf5e..79c0db98f 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -112,8 +112,6 @@ func getTestDynamicPolicyWithoutInitialization(topology *machine.CPUTopology, st podDebugAnnoKeys: []string{podDebugAnnoKey}, } - state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores) - // register allocation behaviors for pods with different QoS level policyImplement.allocationHandlers = map[string]util.AllocationHandler{ consts.PodAnnotationQoSLevelSharedCores: policyImplement.sharedCoresAllocationHandler, @@ -4923,7 +4921,6 @@ func Test_getNUMAAllocatedMemBW(t *testing.T) { as := require.New(t) policyImplement := &DynamicPolicy{} - state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores) testName := "test" highDensityCPUTopology, err := machine.GenerateDummyCPUTopology(384, 2, 12) @@ -5168,7 +5165,7 @@ func Test_getNUMAAllocatedMemBW(t *testing.T) { curTT := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - got, err := getNUMAAllocatedMemBW(curTT.args.machineState, curTT.args.metaServer) + got, err := getNUMAAllocatedMemBW(curTT.args.machineState, curTT.args.metaServer, policyImplement.getContainerRequestedCores) if (err != nil) != curTT.wantErr { t.Errorf("getNUMAAllocatedMemBW() error = %v, wantErr %v", err, curTT.wantErr) return diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util.go index 06cb1bd55..cd005dfd8 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util.go @@ -20,7 +20,6 @@ import ( "fmt" "math" "strings" - "sync" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -56,6 +55,8 @@ const ( NUMAPoolInfix = "-NUMA" ) +type GetContainerRequestedCoresFunc func(allocationInfo *AllocationInfo) float64 + var ( // StaticPools are generated by cpu plugin statically, // and they will be ignored when reading cpu advisor list and watch response. @@ -70,26 +71,9 @@ var ( ).Union(StaticPools) ) -var ( - containerRequestedCoresLock sync.RWMutex - containerRequestedCores func(allocationInfo *AllocationInfo) float64 -) - -func GetContainerRequestedCores() func(allocationInfo *AllocationInfo) float64 { - containerRequestedCoresLock.RLock() - defer containerRequestedCoresLock.RUnlock() - return containerRequestedCores -} - -func SetContainerRequestedCores(f func(allocationInfo *AllocationInfo) float64) { - containerRequestedCoresLock.Lock() - defer containerRequestedCoresLock.Unlock() - containerRequestedCores = f -} - // GetIsolatedQuantityMapFromPodEntries returns a map to indicates isolation info, // and the map is formatted as pod -> container -> isolated-quantity -func GetIsolatedQuantityMapFromPodEntries(podEntries PodEntries, ignoreAllocationInfos []*AllocationInfo) map[string]map[string]int { +func GetIsolatedQuantityMapFromPodEntries(podEntries PodEntries, ignoreAllocationInfos []*AllocationInfo, getContainerRequestedCores GetContainerRequestedCoresFunc) map[string]map[string]int { ret := make(map[string]map[string]int) for podUID, entries := range podEntries { if entries.IsPoolEntry() { @@ -114,7 +98,7 @@ func GetIsolatedQuantityMapFromPodEntries(podEntries PodEntries, ignoreAllocatio // and we will try to isolate those containers, so we will treat them as containers to be isolated. var quantity int if allocationInfo.OwnerPoolName != PoolNameDedicated { - quantity = int(math.Ceil(GetContainerRequestedCores()(allocationInfo))) + quantity = int(math.Ceil(getContainerRequestedCores(allocationInfo))) } else { quantity = allocationInfo.AllocationResult.Size() } @@ -135,7 +119,7 @@ func GetIsolatedQuantityMapFromPodEntries(podEntries PodEntries, ignoreAllocatio // GetSharedQuantityMapFromPodEntries returns a map to indicates quantity info for each shared pool, // and the map is formatted as pool -> quantity -func GetSharedQuantityMapFromPodEntries(podEntries PodEntries, ignoreAllocationInfos []*AllocationInfo) (map[string]map[int]int, error) { +func GetSharedQuantityMapFromPodEntries(podEntries PodEntries, ignoreAllocationInfos []*AllocationInfo, getContainerRequestedCores GetContainerRequestedCoresFunc) (map[string]map[int]int, error) { poolsQuantityMap := make(map[string]map[int]int) allocationInfosToCount := make([]*AllocationInfo, 0, len(podEntries)) for _, entries := range podEntries { @@ -163,7 +147,7 @@ func GetSharedQuantityMapFromPodEntries(podEntries PodEntries, ignoreAllocationI } } - err := CountAllocationInfosToPoolsQuantityMap(allocationInfosToCount, poolsQuantityMap) + err := CountAllocationInfosToPoolsQuantityMap(allocationInfosToCount, poolsQuantityMap, getContainerRequestedCores) if err != nil { return nil, fmt.Errorf("CountAllocationInfosToPoolsQuantityMap faild with error: %v", err) } @@ -171,25 +155,36 @@ func GetSharedQuantityMapFromPodEntries(podEntries PodEntries, ignoreAllocationI return poolsQuantityMap, nil } -// GetTotoalSharedQuantity returns total quanity shared_cores without numa_binding requested -func GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries PodEntries) int { +// GetNonBindingSharedRequestedQuantityFromPodEntries returns total quanity shared_cores without numa_binding requested +func GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries PodEntries, newNonBindingSharedRequestedQuantity map[string]float64, getContainerRequestedCores GetContainerRequestedCoresFunc) int { var reqFloat64 float64 = 0 - for _, entries := range podEntries { + for podUid, entries := range podEntries { if entries.IsPoolEntry() { continue } + // ignore new coming pods (only for inplace update) + if newNonBindingSharedRequestedQuantity != nil { + if _, ok := newNonBindingSharedRequestedQuantity[podUid]; ok { + continue + } + } + for _, allocationInfo := range entries { if allocationInfo == nil || !CheckShared(allocationInfo) || CheckNUMABinding(allocationInfo) { continue } - reqFloat64 += GetContainerRequestedCores()(allocationInfo) + reqFloat64 += getContainerRequestedCores(allocationInfo) } } - return int(math.Ceil(reqFloat64)) + for podUid := range newNonBindingSharedRequestedQuantity { + reqFloat64 += newNonBindingSharedRequestedQuantity[podUid] + } + + return CPUPreciseCeil(reqFloat64) } // GenerateMachineStateFromPodEntries returns NUMANodeMap for given resource based on @@ -318,6 +313,7 @@ func GetSharedBindingNUMAsFromQuantityMap(poolsQuantityMap map[string]map[int]in func CountAllocationInfosToPoolsQuantityMap(allocationInfos []*AllocationInfo, poolsQuantityMap map[string]map[int]int, + getContainerRequestedCores GetContainerRequestedCoresFunc, ) error { if poolsQuantityMap == nil { return fmt.Errorf("nil poolsQuantityMap in CountAllocationInfosToPoolsQuantityMap") @@ -330,7 +326,7 @@ func CountAllocationInfosToPoolsQuantityMap(allocationInfos []*AllocationInfo, return fmt.Errorf("CountAllocationInfosToPoolsQuantityMap got nil allocationInfo") } - reqFloat64 := GetContainerRequestedCores()(allocationInfo) * GetCPUIncrRatio(allocationInfo) + reqFloat64 := getContainerRequestedCores(allocationInfo) * GetCPUIncrRatio(allocationInfo) var targetNUMAID int var poolName string @@ -408,7 +404,7 @@ func CountAllocationInfosToPoolsQuantityMap(allocationInfos []*AllocationInfo, poolsQuantityMap[poolName] = make(map[int]int) } - poolsQuantityMap[poolName][numaID] += int(math.Ceil(preciseQuantity)) + poolsQuantityMap[poolName][numaID] += CPUPreciseCeil(preciseQuantity) // return err will abort the procedure, // so there is no need to revert modifications made in parameter poolsQuantityMap @@ -483,3 +479,14 @@ func checkCPUSetMap(map1, map2 map[int]machine.CPUSet) bool { } return true } + +// CPUPreciseCeil we can not use math.Ceil directly here, because the cpu requests are stored using floats, +// there is a chance of precision issues during addition calculations. +// in critical case: +// - the allocatable cpu of the node is 122 +// - the sum of allocated cpu requests is 118.00000000000001 (after ceil is 119), +// - the new pod request is 4 +// 119 + 4 > 122, so qrm will reject the new pod. +func CPUPreciseCeil(request float64) int { + return int(math.Ceil(float64(int(request*1000)) / 1000)) +} diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util_test.go index 1cc9b265f..0ab282234 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util_test.go @@ -567,9 +567,6 @@ func TestGetSpecifiedPoolName(t *testing.T) { func TestCountAllocationInfosToPoolsQuantityMap(t *testing.T) { t.Parallel() testName := "test" - SetContainerRequestedCores(func(allocationInfo *AllocationInfo) float64 { - return allocationInfo.RequestQuantity - }) type args struct { allocationInfos []*AllocationInfo @@ -851,7 +848,9 @@ func TestCountAllocationInfosToPoolsQuantityMap(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - if err := CountAllocationInfosToPoolsQuantityMap(tt.args.allocationInfos, tt.args.poolsQuantityMap); (err != nil) != tt.wantErr { + if err := CountAllocationInfosToPoolsQuantityMap(tt.args.allocationInfos, tt.args.poolsQuantityMap, func(allocationInfo *AllocationInfo) float64 { + return allocationInfo.RequestQuantity + }); (err != nil) != tt.wantErr { t.Errorf("CountAllocationInfosToPoolsQuantityMap() error = %v, wantErr %v", err, tt.wantErr) } else if err == nil { if !reflect.DeepEqual(tt.args.poolsQuantityMap, tt.want) { @@ -861,3 +860,17 @@ func TestCountAllocationInfosToPoolsQuantityMap(t *testing.T) { }) } } + +func TestCPUPreciseCeil(t *testing.T) { + t.Parallel() + require.Equal(t, 188, CPUPreciseCeil(188.0000000001)) + require.Equal(t, 188, CPUPreciseCeil(187.9999999999)) + require.Equal(t, 189, CPUPreciseCeil(188.001)) + require.Equal(t, 188, CPUPreciseCeil(188.0001)) + array := []float64{4, 1, 4, 0.83, 1, 2, 4, 4, 0.83, 4, 4, 4, 1, 4, 48, 1, 0.507, 0.625, 2, 1, 5.54, 2, 4, 4, 2, 1, 0.01, 4, 1, 0.658, 2} + sum := float64(0) + for _, v := range array { + sum += v + } + require.Equal(t, 118, CPUPreciseCeil(sum)) +} diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go index a866e2606..6f95e0195 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go @@ -765,7 +765,7 @@ func TestNormalShareVPAWithSidecar(t *testing.T) { t.Parallel() as := require.New(t) - tmpDir, err := ioutil.TempDir("", "checkpoint-TestSNBVPAWithSidecar") + tmpDir, err := ioutil.TempDir("", "checkpoint-TestNormalShareVPAWithSidecar") as.Nil(err) defer func() { _ = os.RemoveAll(tmpDir) }() diff --git a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go index 630c78a20..80c930ce0 100644 --- a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go +++ b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go @@ -144,8 +144,6 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration, return false, agent.ComponentStub{}, fmt.Errorf("native policy set reserved CPUs failed with error: %v", err) } - state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores) - err := agentCtx.MetaServer.ConfigurationManager.AddConfigWatcher(crd.AdminQoSConfigurationGVR) if err != nil { return false, nil, err diff --git a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go index 73a1906f6..3b32b615e 100644 --- a/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go @@ -64,8 +64,6 @@ func getTestNativePolicy(topology *machine.CPUTopology, stateFileDirectory strin reservedCPUs: machine.NewCPUSet(), } - state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores) - return policyImplement, nil } diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/vpa_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/vpa_test.go index c0a1ddeac..db3f65eb7 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/vpa_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/vpa_test.go @@ -731,7 +731,7 @@ func TestNormalShareMemoryVPAWithSidecar(t *testing.T) { as := require.New(t) - tmpDir, err := ioutil.TempDir("", "checkpoint-TestNormalShareMemoryVPA") + tmpDir, err := ioutil.TempDir("", "checkpoint-TestNormalShareMemoryVPAWithSidecar") as.Nil(err) defer os.RemoveAll(tmpDir) From 58d14b3c707f112644918604decccfe5f8382c6f Mon Sep 17 00:00:00 2001 From: lilianrong Date: Thu, 29 Aug 2024 22:10:47 +0800 Subject: [PATCH 2/2] feat(qrm): return errors instead of empty array in hints --- .../cpu/dynamicpolicy/policy_hint_handlers.go | 50 +++++++++++++------ .../cpu/dynamicpolicy/policy_test.go | 6 +-- .../qrm-plugins/cpu/dynamicpolicy/vpa_test.go | 2 +- .../dynamicpolicy/policy_hint_handlers.go | 39 +++++++++------ 4 files changed, 64 insertions(+), 33 deletions(-) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go index 57bfea0ef..e7efe2095 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go @@ -41,6 +41,8 @@ import ( qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos" ) +var errNoAvailableCPUHints = fmt.Errorf("no available cpu hints") + type memBWHintUpdate struct { updatedPreferrence bool leftAllocatable int @@ -73,7 +75,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context, "podName": req.PodName, "containerName": req.ContainerName, })...) - return nil, fmt.Errorf("no enough cpu resource") + return nil, errNoAvailableCPUHints } } @@ -209,12 +211,6 @@ func (p *DynamicPolicy) calculateHints(reqInt int, } sort.Ints(numaNodes) - hints := map[string]*pluginapi.ListOfTopologyHints{ - string(v1.ResourceCPU): { - Hints: []*pluginapi.TopologyHint{}, - }, - } - minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology) if err != nil { return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err) @@ -262,6 +258,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int, } preferredHintIndexes := []int{} + var availableNumaHints []*pluginapi.TopologyHint machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) { maskCount := mask.Count() if maskCount < minNUMAsCountNeeded { @@ -292,16 +289,25 @@ func (p *DynamicPolicy) calculateHints(reqInt int, } preferred := maskCount == minNUMAsCountNeeded - hints[string(v1.ResourceCPU)].Hints = append(hints[string(v1.ResourceCPU)].Hints, &pluginapi.TopologyHint{ + availableNumaHints = append(availableNumaHints, &pluginapi.TopologyHint{ Nodes: machine.MaskToUInt64Array(mask), Preferred: preferred, }) if preferred { - preferredHintIndexes = append(preferredHintIndexes, len(hints[string(v1.ResourceCPU)].Hints)-1) + preferredHintIndexes = append(preferredHintIndexes, len(availableNumaHints)-1) } }) + // NOTE: because grpc is inability to distinguish between an empty array and nil, + // we return an error instead of an empty array. + // we should resolve this issue if we need manage multi resource in one plugin. + if len(availableNumaHints) == 0 { + general.Warningf("calculateHints got no available cpu hints for pod: %s/%s, container: %s", + req.PodNamespace, req.PodName, req.ContainerName) + return nil, errNoAvailableCPUHints + } + if numaBound > machine.MBWNUMAsPoint { numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer, p.getContainerRequestedCores) @@ -314,11 +320,17 @@ func (p *DynamicPolicy) calculateHints(reqInt int, general.Errorf("getNUMAAllocatedMemBW failed with error: %v", err) _ = p.emitter.StoreInt64(util.MetricNameGetNUMAAllocatedMemBWFailed, 1, metrics.MetricTypeNameRaw) } else { - p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, hints[string(v1.ResourceCPU)].Hints, + p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, availableNumaHints, reqInt, numaAllocatedMemBW, req, numaExclusive) } } + hints := map[string]*pluginapi.ListOfTopologyHints{ + string(v1.ResourceCPU): { + Hints: availableNumaHints, + }, + } + return hints, nil } @@ -633,7 +645,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, general.Infof("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, try to migrate it to new NUMA", req.PodNamespace, req.PodName, req.ContainerName) var calculateErr error - hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req.Annotations) + hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req) if calculateErr != nil { general.Errorf("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, failed to migrate it to new NUMA", req.PodNamespace, req.PodName, req.ContainerName) @@ -642,7 +654,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, } else { general.Errorf("pod: %s/%s, container: %s request inplace update resize, but no enough resource for it in current NUMA", req.PodNamespace, req.PodName, req.ContainerName) - return nil, fmt.Errorf("inplace update resize scale out failed with no enough resource") + return nil, errNoAvailableCPUHints } } else { general.Infof("pod: %s/%s, container: %s request inplace update resize, there is enough resource for it in current NUMA", @@ -650,7 +662,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, } } else if hints == nil { var calculateErr error - hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req.Annotations) + hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req) if calculateErr != nil { return nil, fmt.Errorf("calculateHintsForNUMABindingSharedCores failed with error: %v", calculateErr) } @@ -780,12 +792,13 @@ func (p *DynamicPolicy) filterNUMANodesByNonBindingSharedRequestedQuantity(nonBi func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podEntries state.PodEntries, machineState state.NUMANodeMap, - reqAnnotations map[string]string, + req *pluginapi.ResourceRequest, ) (map[string]*pluginapi.ListOfTopologyHints, error) { nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckNUMABinding).Size() nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding) nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries, nil, p.getContainerRequestedCores) + reqAnnotations := req.Annotations numaNodes := p.filterNUMANodesByNonBindingSharedRequestedQuantity(nonBindingSharedRequestedQuantity, nonBindingNUMAsCPUQuantity, nonBindingNUMAs, machineState, machineState.GetFilteredNUMASetWithAnnotations(state.CheckNUMABindingSharedCoresAntiAffinity, reqAnnotations).ToSliceInt()) @@ -826,6 +839,15 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, reqInt) } + // NOTE: because grpc is inability to distinguish between an empty array and nil, + // we return an error instead of an empty array. + // we should resolve this issue if we need manage multi resource in one plugin. + if len(hints[string(v1.ResourceCPU)].Hints) == 0 { + general.Warningf("calculateHints got no available memory hints for snb pod: %s/%s, container: %s", + req.PodNamespace, req.PodName, req.ContainerName) + return nil, errNoAvailableCPUHints + } + return hints, nil } diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index 79c0db98f..1969df599 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -5291,10 +5291,8 @@ func TestSNBAdmitWithSidecarReallocate(t *testing.T) { } // pod aggregated size is 8, the new container request is 4, 8 + 4 > 11 (share-NUMA0 size) - res, err = dynamicPolicy.GetTopologyHints(context.Background(), anotherReq) - as.Nil(err) - as.NotNil(res.ResourceHints[string(v1.ResourceCPU)]) - as.Equal(0, len(res.ResourceHints[string(v1.ResourceCPU)].Hints)) + _, err = dynamicPolicy.GetTopologyHints(context.Background(), anotherReq) + as.ErrorContains(err, errNoAvailableCPUHints.Error()) // reallocate sidecar _, err = dynamicPolicy.Allocate(context.Background(), sidecarReq) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go index 6f95e0195..d901e3e7b 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go @@ -708,7 +708,7 @@ func TestNormalShareVPA(t *testing.T) { } _, err = dynamicPolicy.GetTopologyHints(context.Background(), resizeReq) - as.ErrorContains(err, "no enough") + as.ErrorContains(err, errNoAvailableCPUHints.Error()) resizeReq1 := &pluginapi.ResourceRequest{ PodUid: req.PodUid, diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go index d4475a86a..c441125fd 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go @@ -33,6 +33,8 @@ import ( qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos" ) +var errNoAvailableMemoryHints = fmt.Errorf("no available memory hints") + func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context, req *pluginapi.ResourceRequest, ) (*pluginapi.ResourceHintsResponse, error) { @@ -59,7 +61,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context, "podName": req.PodName, "containerName": req.ContainerName, })...) - return nil, fmt.Errorf("no enough memory resource") + return nil, errNoAvailableMemoryHints } } @@ -197,7 +199,7 @@ func (p *DynamicPolicy) numaBindingHintHandler(_ context.Context, var calculateErr error // recalculate hints for the whole pod - hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req.Annotations) + hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req) if calculateErr != nil { general.Errorf("failed to calculate hints for pod: %s/%s, container: %s, error: %v", req.PodNamespace, req.PodName, req.ContainerName, calculateErr) @@ -213,7 +215,7 @@ func (p *DynamicPolicy) numaBindingHintHandler(_ context.Context, // otherwise, calculate hint for container without allocated memory var calculateErr error // calculate hint for container without allocated memory - hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req.Annotations) + hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req) if calculateErr != nil { general.Errorf("failed to calculate hints for pod: %s/%s, container: %s, error: %v", req.PodNamespace, req.PodName, req.ContainerName, calculateErr) @@ -256,8 +258,9 @@ func (p *DynamicPolicy) dedicatedCoresWithoutNUMABindingHintHandler(_ context.Co // calculateHints is a helper function to calculate the topology hints // with the given container requests. -func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState state.NUMANodeResourcesMap, - reqAnnotations map[string]string, +func (p *DynamicPolicy) calculateHints(reqInt uint64, + resourcesMachineState state.NUMANodeResourcesMap, + req *pluginapi.ResourceRequest, ) (map[string]*pluginapi.ListOfTopologyHints, error) { machineState := resourcesMachineState[v1.ResourceMemory] @@ -271,12 +274,6 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat } sort.Ints(numaNodes) - hints := map[string]*pluginapi.ListOfTopologyHints{ - string(v1.ResourceMemory): { - Hints: []*pluginapi.TopologyHint{}, - }, - } - bytesPerNUMA, err := machineState.BytesPerNUMA() if err != nil { return nil, fmt.Errorf("getBytesPerNUMAFromMachineState failed with error: %v", err) @@ -286,7 +283,7 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat if err != nil { return nil, fmt.Errorf("GetNUMANodesCountToFitMemoryReq failed with error: %v", err) } - + reqAnnotations := req.Annotations numaBinding := qosutil.AnnotationsIndicateNUMABinding(reqAnnotations) numaExclusive := qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) @@ -328,6 +325,7 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat numaBound = minNUMAsCountNeeded + 1 } + var availableNumaHints []*pluginapi.TopologyHint machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) { maskCount := mask.Count() if maskCount < minNUMAsCountNeeded { @@ -357,13 +355,26 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat return } - hints[string(v1.ResourceMemory)].Hints = append(hints[string(v1.ResourceMemory)].Hints, &pluginapi.TopologyHint{ + availableNumaHints = append(availableNumaHints, &pluginapi.TopologyHint{ Nodes: machine.MaskToUInt64Array(mask), Preferred: len(maskBits) == minNUMAsCountNeeded, }) }) - return hints, nil + // NOTE: because grpc is inability to distinguish between an empty array and nil, + // we return an error instead of an empty array. + // we should resolve this issue if we need manage multi resource in one plugin. + if len(availableNumaHints) == 0 { + general.Warningf("calculateHints got no available memory hints for pod: %s/%s, container: %s", + req.PodNamespace, req.PodName, req.ContainerName) + return nil, errNoAvailableMemoryHints + } + + return map[string]*pluginapi.ListOfTopologyHints{ + string(v1.ResourceMemory): { + Hints: availableNumaHints, + }, + }, nil } // regenerateHints regenerates hints for container that'd already been allocated memory,