From 58d14b3c707f112644918604decccfe5f8382c6f Mon Sep 17 00:00:00 2001 From: lilianrong Date: Thu, 29 Aug 2024 22:10:47 +0800 Subject: [PATCH] feat(qrm): return errors instead of empty array in hints --- .../cpu/dynamicpolicy/policy_hint_handlers.go | 50 +++++++++++++------ .../cpu/dynamicpolicy/policy_test.go | 6 +-- .../qrm-plugins/cpu/dynamicpolicy/vpa_test.go | 2 +- .../dynamicpolicy/policy_hint_handlers.go | 39 +++++++++------ 4 files changed, 64 insertions(+), 33 deletions(-) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go index 57bfea0ef..e7efe2095 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go @@ -41,6 +41,8 @@ import ( qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos" ) +var errNoAvailableCPUHints = fmt.Errorf("no available cpu hints") + type memBWHintUpdate struct { updatedPreferrence bool leftAllocatable int @@ -73,7 +75,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context, "podName": req.PodName, "containerName": req.ContainerName, })...) - return nil, fmt.Errorf("no enough cpu resource") + return nil, errNoAvailableCPUHints } } @@ -209,12 +211,6 @@ func (p *DynamicPolicy) calculateHints(reqInt int, } sort.Ints(numaNodes) - hints := map[string]*pluginapi.ListOfTopologyHints{ - string(v1.ResourceCPU): { - Hints: []*pluginapi.TopologyHint{}, - }, - } - minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology) if err != nil { return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err) @@ -262,6 +258,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int, } preferredHintIndexes := []int{} + var availableNumaHints []*pluginapi.TopologyHint machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) { maskCount := mask.Count() if maskCount < minNUMAsCountNeeded { @@ -292,16 +289,25 @@ func (p *DynamicPolicy) calculateHints(reqInt int, } preferred := maskCount == minNUMAsCountNeeded - hints[string(v1.ResourceCPU)].Hints = append(hints[string(v1.ResourceCPU)].Hints, &pluginapi.TopologyHint{ + availableNumaHints = append(availableNumaHints, &pluginapi.TopologyHint{ Nodes: machine.MaskToUInt64Array(mask), Preferred: preferred, }) if preferred { - preferredHintIndexes = append(preferredHintIndexes, len(hints[string(v1.ResourceCPU)].Hints)-1) + preferredHintIndexes = append(preferredHintIndexes, len(availableNumaHints)-1) } }) + // NOTE: because grpc is inability to distinguish between an empty array and nil, + // we return an error instead of an empty array. + // we should resolve this issue if we need manage multi resource in one plugin. + if len(availableNumaHints) == 0 { + general.Warningf("calculateHints got no available cpu hints for pod: %s/%s, container: %s", + req.PodNamespace, req.PodName, req.ContainerName) + return nil, errNoAvailableCPUHints + } + if numaBound > machine.MBWNUMAsPoint { numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer, p.getContainerRequestedCores) @@ -314,11 +320,17 @@ func (p *DynamicPolicy) calculateHints(reqInt int, general.Errorf("getNUMAAllocatedMemBW failed with error: %v", err) _ = p.emitter.StoreInt64(util.MetricNameGetNUMAAllocatedMemBWFailed, 1, metrics.MetricTypeNameRaw) } else { - p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, hints[string(v1.ResourceCPU)].Hints, + p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, availableNumaHints, reqInt, numaAllocatedMemBW, req, numaExclusive) } } + hints := map[string]*pluginapi.ListOfTopologyHints{ + string(v1.ResourceCPU): { + Hints: availableNumaHints, + }, + } + return hints, nil } @@ -633,7 +645,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, general.Infof("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, try to migrate it to new NUMA", req.PodNamespace, req.PodName, req.ContainerName) var calculateErr error - hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req.Annotations) + hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req) if calculateErr != nil { general.Errorf("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, failed to migrate it to new NUMA", req.PodNamespace, req.PodName, req.ContainerName) @@ -642,7 +654,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, } else { general.Errorf("pod: %s/%s, container: %s request inplace update resize, but no enough resource for it in current NUMA", req.PodNamespace, req.PodName, req.ContainerName) - return nil, fmt.Errorf("inplace update resize scale out failed with no enough resource") + return nil, errNoAvailableCPUHints } } else { general.Infof("pod: %s/%s, container: %s request inplace update resize, there is enough resource for it in current NUMA", @@ -650,7 +662,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, } } else if hints == nil { var calculateErr error - hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req.Annotations) + hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req) if calculateErr != nil { return nil, fmt.Errorf("calculateHintsForNUMABindingSharedCores failed with error: %v", calculateErr) } @@ -780,12 +792,13 @@ func (p *DynamicPolicy) filterNUMANodesByNonBindingSharedRequestedQuantity(nonBi func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podEntries state.PodEntries, machineState state.NUMANodeMap, - reqAnnotations map[string]string, + req *pluginapi.ResourceRequest, ) (map[string]*pluginapi.ListOfTopologyHints, error) { nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckNUMABinding).Size() nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding) nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries, nil, p.getContainerRequestedCores) + reqAnnotations := req.Annotations numaNodes := p.filterNUMANodesByNonBindingSharedRequestedQuantity(nonBindingSharedRequestedQuantity, nonBindingNUMAsCPUQuantity, nonBindingNUMAs, machineState, machineState.GetFilteredNUMASetWithAnnotations(state.CheckNUMABindingSharedCoresAntiAffinity, reqAnnotations).ToSliceInt()) @@ -826,6 +839,15 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, reqInt) } + // NOTE: because grpc is inability to distinguish between an empty array and nil, + // we return an error instead of an empty array. + // we should resolve this issue if we need manage multi resource in one plugin. + if len(hints[string(v1.ResourceCPU)].Hints) == 0 { + general.Warningf("calculateHints got no available memory hints for snb pod: %s/%s, container: %s", + req.PodNamespace, req.PodName, req.ContainerName) + return nil, errNoAvailableCPUHints + } + return hints, nil } diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index 79c0db98f..1969df599 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -5291,10 +5291,8 @@ func TestSNBAdmitWithSidecarReallocate(t *testing.T) { } // pod aggregated size is 8, the new container request is 4, 8 + 4 > 11 (share-NUMA0 size) - res, err = dynamicPolicy.GetTopologyHints(context.Background(), anotherReq) - as.Nil(err) - as.NotNil(res.ResourceHints[string(v1.ResourceCPU)]) - as.Equal(0, len(res.ResourceHints[string(v1.ResourceCPU)].Hints)) + _, err = dynamicPolicy.GetTopologyHints(context.Background(), anotherReq) + as.ErrorContains(err, errNoAvailableCPUHints.Error()) // reallocate sidecar _, err = dynamicPolicy.Allocate(context.Background(), sidecarReq) diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go index 6f95e0195..d901e3e7b 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go @@ -708,7 +708,7 @@ func TestNormalShareVPA(t *testing.T) { } _, err = dynamicPolicy.GetTopologyHints(context.Background(), resizeReq) - as.ErrorContains(err, "no enough") + as.ErrorContains(err, errNoAvailableCPUHints.Error()) resizeReq1 := &pluginapi.ResourceRequest{ PodUid: req.PodUid, diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go index d4475a86a..c441125fd 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go @@ -33,6 +33,8 @@ import ( qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos" ) +var errNoAvailableMemoryHints = fmt.Errorf("no available memory hints") + func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context, req *pluginapi.ResourceRequest, ) (*pluginapi.ResourceHintsResponse, error) { @@ -59,7 +61,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context, "podName": req.PodName, "containerName": req.ContainerName, })...) - return nil, fmt.Errorf("no enough memory resource") + return nil, errNoAvailableMemoryHints } } @@ -197,7 +199,7 @@ func (p *DynamicPolicy) numaBindingHintHandler(_ context.Context, var calculateErr error // recalculate hints for the whole pod - hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req.Annotations) + hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req) if calculateErr != nil { general.Errorf("failed to calculate hints for pod: %s/%s, container: %s, error: %v", req.PodNamespace, req.PodName, req.ContainerName, calculateErr) @@ -213,7 +215,7 @@ func (p *DynamicPolicy) numaBindingHintHandler(_ context.Context, // otherwise, calculate hint for container without allocated memory var calculateErr error // calculate hint for container without allocated memory - hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req.Annotations) + hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req) if calculateErr != nil { general.Errorf("failed to calculate hints for pod: %s/%s, container: %s, error: %v", req.PodNamespace, req.PodName, req.ContainerName, calculateErr) @@ -256,8 +258,9 @@ func (p *DynamicPolicy) dedicatedCoresWithoutNUMABindingHintHandler(_ context.Co // calculateHints is a helper function to calculate the topology hints // with the given container requests. -func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState state.NUMANodeResourcesMap, - reqAnnotations map[string]string, +func (p *DynamicPolicy) calculateHints(reqInt uint64, + resourcesMachineState state.NUMANodeResourcesMap, + req *pluginapi.ResourceRequest, ) (map[string]*pluginapi.ListOfTopologyHints, error) { machineState := resourcesMachineState[v1.ResourceMemory] @@ -271,12 +274,6 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat } sort.Ints(numaNodes) - hints := map[string]*pluginapi.ListOfTopologyHints{ - string(v1.ResourceMemory): { - Hints: []*pluginapi.TopologyHint{}, - }, - } - bytesPerNUMA, err := machineState.BytesPerNUMA() if err != nil { return nil, fmt.Errorf("getBytesPerNUMAFromMachineState failed with error: %v", err) @@ -286,7 +283,7 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat if err != nil { return nil, fmt.Errorf("GetNUMANodesCountToFitMemoryReq failed with error: %v", err) } - + reqAnnotations := req.Annotations numaBinding := qosutil.AnnotationsIndicateNUMABinding(reqAnnotations) numaExclusive := qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations) @@ -328,6 +325,7 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat numaBound = minNUMAsCountNeeded + 1 } + var availableNumaHints []*pluginapi.TopologyHint machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) { maskCount := mask.Count() if maskCount < minNUMAsCountNeeded { @@ -357,13 +355,26 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat return } - hints[string(v1.ResourceMemory)].Hints = append(hints[string(v1.ResourceMemory)].Hints, &pluginapi.TopologyHint{ + availableNumaHints = append(availableNumaHints, &pluginapi.TopologyHint{ Nodes: machine.MaskToUInt64Array(mask), Preferred: len(maskBits) == minNUMAsCountNeeded, }) }) - return hints, nil + // NOTE: because grpc is inability to distinguish between an empty array and nil, + // we return an error instead of an empty array. + // we should resolve this issue if we need manage multi resource in one plugin. + if len(availableNumaHints) == 0 { + general.Warningf("calculateHints got no available memory hints for pod: %s/%s, container: %s", + req.PodNamespace, req.PodName, req.ContainerName) + return nil, errNoAvailableMemoryHints + } + + return map[string]*pluginapi.ListOfTopologyHints{ + string(v1.ResourceMemory): { + Hints: availableNumaHints, + }, + }, nil } // regenerateHints regenerates hints for container that'd already been allocated memory,