Skip to content

Commit

Permalink
feat(qrm): return errors instead of empty array in hints
Browse files Browse the repository at this point in the history
  • Loading branch information
nightmeng committed Aug 30, 2024
1 parent 8453174 commit 58d14b3
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 33 deletions.
50 changes: 36 additions & 14 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos"
)

var errNoAvailableCPUHints = fmt.Errorf("no available cpu hints")

type memBWHintUpdate struct {
updatedPreferrence bool
leftAllocatable int
Expand Down Expand Up @@ -73,7 +75,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
"podName": req.PodName,
"containerName": req.ContainerName,
})...)
return nil, fmt.Errorf("no enough cpu resource")
return nil, errNoAvailableCPUHints
}
}

Expand Down Expand Up @@ -209,12 +211,6 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
}
sort.Ints(numaNodes)

hints := map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceCPU): {
Hints: []*pluginapi.TopologyHint{},
},
}

minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology)
if err != nil {
return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err)
Expand Down Expand Up @@ -262,6 +258,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
}

preferredHintIndexes := []int{}
var availableNumaHints []*pluginapi.TopologyHint
machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) {
maskCount := mask.Count()
if maskCount < minNUMAsCountNeeded {
Expand Down Expand Up @@ -292,16 +289,25 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
}

preferred := maskCount == minNUMAsCountNeeded
hints[string(v1.ResourceCPU)].Hints = append(hints[string(v1.ResourceCPU)].Hints, &pluginapi.TopologyHint{
availableNumaHints = append(availableNumaHints, &pluginapi.TopologyHint{
Nodes: machine.MaskToUInt64Array(mask),
Preferred: preferred,
})

if preferred {
preferredHintIndexes = append(preferredHintIndexes, len(hints[string(v1.ResourceCPU)].Hints)-1)
preferredHintIndexes = append(preferredHintIndexes, len(availableNumaHints)-1)
}
})

// NOTE: because grpc is inability to distinguish between an empty array and nil,
// we return an error instead of an empty array.
// we should resolve this issue if we need manage multi resource in one plugin.
if len(availableNumaHints) == 0 {
general.Warningf("calculateHints got no available cpu hints for pod: %s/%s, container: %s",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, errNoAvailableCPUHints
}

if numaBound > machine.MBWNUMAsPoint {
numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer, p.getContainerRequestedCores)

Expand All @@ -314,11 +320,17 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
general.Errorf("getNUMAAllocatedMemBW failed with error: %v", err)
_ = p.emitter.StoreInt64(util.MetricNameGetNUMAAllocatedMemBWFailed, 1, metrics.MetricTypeNameRaw)
} else {
p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, hints[string(v1.ResourceCPU)].Hints,
p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, availableNumaHints,
reqInt, numaAllocatedMemBW, req, numaExclusive)
}
}

hints := map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceCPU): {
Hints: availableNumaHints,
},
}

return hints, nil
}

Expand Down Expand Up @@ -633,7 +645,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
general.Infof("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, try to migrate it to new NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
var calculateErr error
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req.Annotations)
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req)
if calculateErr != nil {
general.Errorf("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, failed to migrate it to new NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
Expand All @@ -642,15 +654,15 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
} else {
general.Errorf("pod: %s/%s, container: %s request inplace update resize, but no enough resource for it in current NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, fmt.Errorf("inplace update resize scale out failed with no enough resource")
return nil, errNoAvailableCPUHints
}
} else {
general.Infof("pod: %s/%s, container: %s request inplace update resize, there is enough resource for it in current NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
}
} else if hints == nil {
var calculateErr error
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req.Annotations)
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req)
if calculateErr != nil {
return nil, fmt.Errorf("calculateHintsForNUMABindingSharedCores failed with error: %v", calculateErr)
}
Expand Down Expand Up @@ -780,12 +792,13 @@ func (p *DynamicPolicy) filterNUMANodesByNonBindingSharedRequestedQuantity(nonBi

func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podEntries state.PodEntries,
machineState state.NUMANodeMap,
reqAnnotations map[string]string,
req *pluginapi.ResourceRequest,
) (map[string]*pluginapi.ListOfTopologyHints, error) {
nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckNUMABinding).Size()
nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding)
nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries, nil, p.getContainerRequestedCores)

reqAnnotations := req.Annotations
numaNodes := p.filterNUMANodesByNonBindingSharedRequestedQuantity(nonBindingSharedRequestedQuantity,
nonBindingNUMAsCPUQuantity, nonBindingNUMAs, machineState,
machineState.GetFilteredNUMASetWithAnnotations(state.CheckNUMABindingSharedCoresAntiAffinity, reqAnnotations).ToSliceInt())
Expand Down Expand Up @@ -826,6 +839,15 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE
p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, reqInt)
}

// NOTE: because grpc is inability to distinguish between an empty array and nil,
// we return an error instead of an empty array.
// we should resolve this issue if we need manage multi resource in one plugin.
if len(hints[string(v1.ResourceCPU)].Hints) == 0 {
general.Warningf("calculateHints got no available memory hints for snb pod: %s/%s, container: %s",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, errNoAvailableCPUHints
}

return hints, nil
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5291,10 +5291,8 @@ func TestSNBAdmitWithSidecarReallocate(t *testing.T) {
}

// pod aggregated size is 8, the new container request is 4, 8 + 4 > 11 (share-NUMA0 size)
res, err = dynamicPolicy.GetTopologyHints(context.Background(), anotherReq)
as.Nil(err)
as.NotNil(res.ResourceHints[string(v1.ResourceCPU)])
as.Equal(0, len(res.ResourceHints[string(v1.ResourceCPU)].Hints))
_, err = dynamicPolicy.GetTopologyHints(context.Background(), anotherReq)
as.ErrorContains(err, errNoAvailableCPUHints.Error())

// reallocate sidecar
_, err = dynamicPolicy.Allocate(context.Background(), sidecarReq)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/vpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ func TestNormalShareVPA(t *testing.T) {
}

_, err = dynamicPolicy.GetTopologyHints(context.Background(), resizeReq)
as.ErrorContains(err, "no enough")
as.ErrorContains(err, errNoAvailableCPUHints.Error())

resizeReq1 := &pluginapi.ResourceRequest{
PodUid: req.PodUid,
Expand Down
39 changes: 25 additions & 14 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_hint_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos"
)

var errNoAvailableMemoryHints = fmt.Errorf("no available memory hints")

func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
req *pluginapi.ResourceRequest,
) (*pluginapi.ResourceHintsResponse, error) {
Expand All @@ -59,7 +61,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
"podName": req.PodName,
"containerName": req.ContainerName,
})...)
return nil, fmt.Errorf("no enough memory resource")
return nil, errNoAvailableMemoryHints
}
}

Expand Down Expand Up @@ -197,7 +199,7 @@ func (p *DynamicPolicy) numaBindingHintHandler(_ context.Context,

var calculateErr error
// recalculate hints for the whole pod
hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req.Annotations)
hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req)
if calculateErr != nil {
general.Errorf("failed to calculate hints for pod: %s/%s, container: %s, error: %v",
req.PodNamespace, req.PodName, req.ContainerName, calculateErr)
Expand All @@ -213,7 +215,7 @@ func (p *DynamicPolicy) numaBindingHintHandler(_ context.Context,
// otherwise, calculate hint for container without allocated memory
var calculateErr error
// calculate hint for container without allocated memory
hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req.Annotations)
hints, calculateErr = p.calculateHints(uint64(podAggregatedRequest), resourcesMachineState, req)
if calculateErr != nil {
general.Errorf("failed to calculate hints for pod: %s/%s, container: %s, error: %v",
req.PodNamespace, req.PodName, req.ContainerName, calculateErr)
Expand Down Expand Up @@ -256,8 +258,9 @@ func (p *DynamicPolicy) dedicatedCoresWithoutNUMABindingHintHandler(_ context.Co

// calculateHints is a helper function to calculate the topology hints
// with the given container requests.
func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState state.NUMANodeResourcesMap,
reqAnnotations map[string]string,
func (p *DynamicPolicy) calculateHints(reqInt uint64,
resourcesMachineState state.NUMANodeResourcesMap,
req *pluginapi.ResourceRequest,
) (map[string]*pluginapi.ListOfTopologyHints, error) {
machineState := resourcesMachineState[v1.ResourceMemory]

Expand All @@ -271,12 +274,6 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat
}
sort.Ints(numaNodes)

hints := map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceMemory): {
Hints: []*pluginapi.TopologyHint{},
},
}

bytesPerNUMA, err := machineState.BytesPerNUMA()
if err != nil {
return nil, fmt.Errorf("getBytesPerNUMAFromMachineState failed with error: %v", err)
Expand All @@ -286,7 +283,7 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat
if err != nil {
return nil, fmt.Errorf("GetNUMANodesCountToFitMemoryReq failed with error: %v", err)
}

reqAnnotations := req.Annotations
numaBinding := qosutil.AnnotationsIndicateNUMABinding(reqAnnotations)
numaExclusive := qosutil.AnnotationsIndicateNUMAExclusive(reqAnnotations)

Expand Down Expand Up @@ -328,6 +325,7 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat
numaBound = minNUMAsCountNeeded + 1
}

var availableNumaHints []*pluginapi.TopologyHint
machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) {
maskCount := mask.Count()
if maskCount < minNUMAsCountNeeded {
Expand Down Expand Up @@ -357,13 +355,26 @@ func (p *DynamicPolicy) calculateHints(reqInt uint64, resourcesMachineState stat
return
}

hints[string(v1.ResourceMemory)].Hints = append(hints[string(v1.ResourceMemory)].Hints, &pluginapi.TopologyHint{
availableNumaHints = append(availableNumaHints, &pluginapi.TopologyHint{
Nodes: machine.MaskToUInt64Array(mask),
Preferred: len(maskBits) == minNUMAsCountNeeded,
})
})

return hints, nil
// NOTE: because grpc is inability to distinguish between an empty array and nil,
// we return an error instead of an empty array.
// we should resolve this issue if we need manage multi resource in one plugin.
if len(availableNumaHints) == 0 {
general.Warningf("calculateHints got no available memory hints for pod: %s/%s, container: %s",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, errNoAvailableMemoryHints
}

return map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceMemory): {
Hints: availableNumaHints,
},
}, nil
}

// regenerateHints regenerates hints for container that'd already been allocated memory,
Expand Down

0 comments on commit 58d14b3

Please sign in to comment.