Skip to content

Commit

Permalink
Merge pull request #686 from nightmeng/dev/fix-cpu-admit-failed.hints
Browse files Browse the repository at this point in the history
feat(qrm): optimize admit logic
  • Loading branch information
luomingmeng committed Aug 30, 2024
2 parents 8f6cbb5 + 58d14b3 commit 2333cae
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 108 deletions.
21 changes: 1 addition & 20 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package dynamicpolicy
import (
"context"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -222,8 +221,6 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresHintHandler,
}

state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores)

if err := policyImplement.cleanPools(); err != nil {
return false, agent.ComponentStub{}, fmt.Errorf("cleanPools failed with error: %v", err)
}
Expand Down Expand Up @@ -1181,28 +1178,12 @@ func (p *DynamicPolicy) checkNormalShareCoresCpuResource(req *pluginapi.Resource
return false, fmt.Errorf("GetQuantityFromResourceReq failed with error: %v", err)
}

shareCoresAllocated := reqFloat64
podEntries := p.state.GetPodEntries()
for podUid, podEntry := range podEntries {
if podEntry.IsPoolEntry() {
continue
}
if podUid == req.PodUid {
continue
}
for _, allocation := range podEntry {
// shareCoresAllocated should involve both main and sidecar containers
if state.CheckShared(allocation) && !state.CheckNUMABinding(allocation) {
shareCoresAllocated += p.getContainerRequestedCores(allocation)
}
}
}
shareCoresAllocatedInt := state.GetNonBindingSharedRequestedQuantityFromPodEntries(p.state.GetPodEntries(), map[string]float64{req.PodUid: reqFloat64}, p.getContainerRequestedCores)

machineState := p.state.GetMachineState()
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
state.CheckDedicated, state.CheckNUMABinding)

shareCoresAllocatedInt := int(math.Ceil(shareCoresAllocated))
general.Infof("[checkNormalShareCoresCpuResource] node cpu allocated: %d, allocatable: %d", shareCoresAllocatedInt, pooledCPUs.Size())
if shareCoresAllocatedInt > pooledCPUs.Size() {
general.Warningf("[checkNormalShareCoresCpuResource] no enough cpu resource for normal share cores pod: %s/%s, container: %s (request: %.02f, node allocated: %d, node allocatable: %d)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
}
newEntries[podUID][containerName] = allocationInfo.Clone()
// adapt to old checkpoint without RequestQuantity property
newEntries[podUID][containerName].RequestQuantity = state.GetContainerRequestedCores()(allocationInfo)
newEntries[podUID][containerName].RequestQuantity = p.getContainerRequestedCores(allocationInfo)

switch allocationInfo.QoSLevel {
case consts.PodAnnotationQoSLevelDedicatedCores:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,15 +726,15 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesResizeAware(orig
return fmt.Errorf("pool %s cross NUMA: %+v", poolName, poolsQuantityMap[poolName])
}
} else if incrByReq {
err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap)
err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap, p.getContainerRequestedCores)
if err != nil {
return fmt.Errorf("CountAllocationInfosToPoolsQuantityMap failed with error: %v", err)
}
}
} else {
// else we do sum(containers req) for each pool to get pools ratio
var err error
poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, allocationInfos)
poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, allocationInfos, p.getContainerRequestedCores)
if err != nil {
return fmt.Errorf("GetSharedQuantityMapFromPodEntries failed with error: %v", err)
}
Expand All @@ -745,14 +745,14 @@ func (p *DynamicPolicy) putAllocationsAndAdjustAllocationEntriesResizeAware(orig
allocationInfos[0].PodNamespace, allocationInfos[0].PodName, allocationInfos[0].ContainerName)
}
// if advisor is disabled, qrm can re-calc the pool size exactly. we don't need to adjust the pool size.
err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap)
err := state.CountAllocationInfosToPoolsQuantityMap(allocationInfos, poolsQuantityMap, p.getContainerRequestedCores)
if err != nil {
return fmt.Errorf("CountAllocationInfosToPoolsQuantityMap failed with error: %v", err)
}
}
}

isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, allocationInfos)
isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, allocationInfos, p.getContainerRequestedCores)
err := p.adjustPoolsAndIsolatedEntries(poolsQuantityMap, isolatedQuantityMap,
entries, machineState)
if err != nil {
Expand Down Expand Up @@ -867,12 +867,12 @@ func (p *DynamicPolicy) adjustAllocationEntries() error {
poolsQuantityMap = machine.ParseCPUAssignmentQuantityMap(poolsCPUSetMap)
} else {
var err error
poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, nil)
poolsQuantityMap, err = state.GetSharedQuantityMapFromPodEntries(entries, nil, p.getContainerRequestedCores)
if err != nil {
return fmt.Errorf("GetSharedQuantityMapFromPodEntries failed with error: %v", err)
}
}
isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, nil)
isolatedQuantityMap := state.GetIsolatedQuantityMapFromPodEntries(entries, nil, p.getContainerRequestedCores)

err := p.adjustPoolsAndIsolatedEntries(poolsQuantityMap, isolatedQuantityMap, entries, machineState)
if err != nil {
Expand Down Expand Up @@ -1095,7 +1095,7 @@ func (p *DynamicPolicy) applyPoolsAndIsolatedInfo(poolsCPUSet map[string]machine

newPodEntries[podUID][containerName] = allocationInfo.Clone()
// adapt to old checkpoint without RequestQuantity property
newPodEntries[podUID][containerName].RequestQuantity = state.GetContainerRequestedCores()(allocationInfo)
newPodEntries[podUID][containerName].RequestQuantity = p.getContainerRequestedCores(allocationInfo)
switch allocationInfo.QoSLevel {
case apiconsts.PodAnnotationQoSLevelDedicatedCores:
newPodEntries[podUID][containerName].OwnerPoolName = allocationInfo.GetPoolName()
Expand Down
58 changes: 40 additions & 18 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
qosutil "github.com/kubewharf/katalyst-core/pkg/util/qos"
)

var errNoAvailableCPUHints = fmt.Errorf("no available cpu hints")

type memBWHintUpdate struct {
updatedPreferrence bool
leftAllocatable int
Expand Down Expand Up @@ -73,7 +75,7 @@ func (p *DynamicPolicy) sharedCoresHintHandler(ctx context.Context,
"podName": req.PodName,
"containerName": req.ContainerName,
})...)
return nil, fmt.Errorf("no enough cpu resource")
return nil, errNoAvailableCPUHints
}
}

Expand Down Expand Up @@ -209,12 +211,6 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
}
sort.Ints(numaNodes)

hints := map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceCPU): {
Hints: []*pluginapi.TopologyHint{},
},
}

minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology)
if err != nil {
return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err)
Expand Down Expand Up @@ -262,6 +258,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
}

preferredHintIndexes := []int{}
var availableNumaHints []*pluginapi.TopologyHint
machine.IterateBitMasks(numaNodes, numaBound, func(mask machine.BitMask) {
maskCount := mask.Count()
if maskCount < minNUMAsCountNeeded {
Expand Down Expand Up @@ -292,18 +289,27 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
}

preferred := maskCount == minNUMAsCountNeeded
hints[string(v1.ResourceCPU)].Hints = append(hints[string(v1.ResourceCPU)].Hints, &pluginapi.TopologyHint{
availableNumaHints = append(availableNumaHints, &pluginapi.TopologyHint{
Nodes: machine.MaskToUInt64Array(mask),
Preferred: preferred,
})

if preferred {
preferredHintIndexes = append(preferredHintIndexes, len(hints[string(v1.ResourceCPU)].Hints)-1)
preferredHintIndexes = append(preferredHintIndexes, len(availableNumaHints)-1)
}
})

// NOTE: because grpc is inability to distinguish between an empty array and nil,
// we return an error instead of an empty array.
// we should resolve this issue if we need manage multi resource in one plugin.
if len(availableNumaHints) == 0 {
general.Warningf("calculateHints got no available cpu hints for pod: %s/%s, container: %s",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, errNoAvailableCPUHints
}

if numaBound > machine.MBWNUMAsPoint {
numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer)
numaAllocatedMemBW, err := getNUMAAllocatedMemBW(machineState, p.metaServer, p.getContainerRequestedCores)

general.InfoS("getNUMAAllocatedMemBW",
"podNamespace", req.PodNamespace,
Expand All @@ -314,15 +320,21 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
general.Errorf("getNUMAAllocatedMemBW failed with error: %v", err)
_ = p.emitter.StoreInt64(util.MetricNameGetNUMAAllocatedMemBWFailed, 1, metrics.MetricTypeNameRaw)
} else {
p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, hints[string(v1.ResourceCPU)].Hints,
p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, availableNumaHints,
reqInt, numaAllocatedMemBW, req, numaExclusive)
}
}

hints := map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceCPU): {
Hints: availableNumaHints,
},
}

return hints, nil
}

func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserver.MetaServer) (map[int]int, error) {
func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserver.MetaServer, getContainerRequestedCores state.GetContainerRequestedCoresFunc) (map[int]int, error) {
numaAllocatedMemBW := make(map[int]int)
podUIDToMemBWReq := make(map[string]int)
podUIDToBindingNUMAs := make(map[string]sets.Int)
Expand Down Expand Up @@ -350,7 +362,7 @@ func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserve
Name: allocationInfo.PodName,
Labels: allocationInfo.Labels,
Annotations: allocationInfo.Annotations,
}, int(math.Ceil(state.GetContainerRequestedCores()(allocationInfo))))
}, int(math.Ceil(getContainerRequestedCores(allocationInfo))))
if err != nil {
return nil, fmt.Errorf("GetContainerMemoryBandwidthRequest for pod: %s/%s, container: %s failed with error: %v",
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName, err)
Expand Down Expand Up @@ -633,7 +645,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
general.Infof("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, try to migrate it to new NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
var calculateErr error
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req.Annotations)
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req)
if calculateErr != nil {
general.Errorf("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, failed to migrate it to new NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
Expand All @@ -642,15 +654,15 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
} else {
general.Errorf("pod: %s/%s, container: %s request inplace update resize, but no enough resource for it in current NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, fmt.Errorf("inplace update resize scale out failed with no enough resource")
return nil, errNoAvailableCPUHints
}
} else {
general.Infof("pod: %s/%s, container: %s request inplace update resize, there is enough resource for it in current NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
}
} else if hints == nil {
var calculateErr error
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req.Annotations)
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req)
if calculateErr != nil {
return nil, fmt.Errorf("calculateHintsForNUMABindingSharedCores failed with error: %v", calculateErr)
}
Expand Down Expand Up @@ -780,12 +792,13 @@ func (p *DynamicPolicy) filterNUMANodesByNonBindingSharedRequestedQuantity(nonBi

func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podEntries state.PodEntries,
machineState state.NUMANodeMap,
reqAnnotations map[string]string,
req *pluginapi.ResourceRequest,
) (map[string]*pluginapi.ListOfTopologyHints, error) {
nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckNUMABinding).Size()
nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding)
nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries)
nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries, nil, p.getContainerRequestedCores)

reqAnnotations := req.Annotations
numaNodes := p.filterNUMANodesByNonBindingSharedRequestedQuantity(nonBindingSharedRequestedQuantity,
nonBindingNUMAsCPUQuantity, nonBindingNUMAs, machineState,
machineState.GetFilteredNUMASetWithAnnotations(state.CheckNUMABindingSharedCoresAntiAffinity, reqAnnotations).ToSliceInt())
Expand Down Expand Up @@ -826,6 +839,15 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE
p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, reqInt)
}

// NOTE: because grpc is inability to distinguish between an empty array and nil,
// we return an error instead of an empty array.
// we should resolve this issue if we need manage multi resource in one plugin.
if len(hints[string(v1.ResourceCPU)].Hints) == 0 {
general.Warningf("calculateHints got no available memory hints for snb pod: %s/%s, container: %s",
req.PodNamespace, req.PodName, req.ContainerName)
return nil, errNoAvailableCPUHints
}

return hints, nil
}

Expand Down
11 changes: 3 additions & 8 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ func getTestDynamicPolicyWithoutInitialization(topology *machine.CPUTopology, st
podDebugAnnoKeys: []string{podDebugAnnoKey},
}

state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores)

// register allocation behaviors for pods with different QoS level
policyImplement.allocationHandlers = map[string]util.AllocationHandler{
consts.PodAnnotationQoSLevelSharedCores: policyImplement.sharedCoresAllocationHandler,
Expand Down Expand Up @@ -4923,7 +4921,6 @@ func Test_getNUMAAllocatedMemBW(t *testing.T) {
as := require.New(t)

policyImplement := &DynamicPolicy{}
state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores)

testName := "test"
highDensityCPUTopology, err := machine.GenerateDummyCPUTopology(384, 2, 12)
Expand Down Expand Up @@ -5168,7 +5165,7 @@ func Test_getNUMAAllocatedMemBW(t *testing.T) {
curTT := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got, err := getNUMAAllocatedMemBW(curTT.args.machineState, curTT.args.metaServer)
got, err := getNUMAAllocatedMemBW(curTT.args.machineState, curTT.args.metaServer, policyImplement.getContainerRequestedCores)
if (err != nil) != curTT.wantErr {
t.Errorf("getNUMAAllocatedMemBW() error = %v, wantErr %v", err, curTT.wantErr)
return
Expand Down Expand Up @@ -5294,10 +5291,8 @@ func TestSNBAdmitWithSidecarReallocate(t *testing.T) {
}

// pod aggregated size is 8, the new container request is 4, 8 + 4 > 11 (share-NUMA0 size)
res, err = dynamicPolicy.GetTopologyHints(context.Background(), anotherReq)
as.Nil(err)
as.NotNil(res.ResourceHints[string(v1.ResourceCPU)])
as.Equal(0, len(res.ResourceHints[string(v1.ResourceCPU)].Hints))
_, err = dynamicPolicy.GetTopologyHints(context.Background(), anotherReq)
as.ErrorContains(err, errNoAvailableCPUHints.Error())

// reallocate sidecar
_, err = dynamicPolicy.Allocate(context.Background(), sidecarReq)
Expand Down
Loading

0 comments on commit 2333cae

Please sign in to comment.