Skip to content

Commit

Permalink
feat(qrm): support CPUNUMAHintPreferPolicyDynamicPacking
Browse files Browse the repository at this point in the history
  • Loading branch information
csfldf committed May 6, 2024
1 parent 86120e6 commit 17410b8
Show file tree
Hide file tree
Showing 6 changed files with 626 additions and 22 deletions.
4 changes: 4 additions & 0 deletions cmd/katalyst-agent/app/options/qrm/cpu_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type CPUDynamicPolicyOptions struct {
EnableSyncingCPUIdle bool
EnableCPUIdle bool
CPUNUMAHintPreferPolicy string
CPUNUMAHintPreferLowThreshold float64
}

type CPUNativePolicyOptions struct {
Expand Down Expand Up @@ -93,6 +94,8 @@ func (o *CPUOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"specific cgroup paths and it requires --enable-syncing-cpu-idle=true to make effect")
fs.StringVar(&o.CPUNUMAHintPreferPolicy, "cpu-numa-hint-prefer-policy", o.CPUNUMAHintPreferPolicy,
"it decides hint preference calculation strategy")
fs.Float64Var(&o.CPUNUMAHintPreferLowThreshold, "cpu-numa-hint-prefer-low-threshold", o.CPUNUMAHintPreferLowThreshold,
"it indicates threshold to apply CPUNUMAHintPreferPolicy dynamically, and it's working when CPUNUMAHintPreferPolicy is set to dynamic_packing")
fs.StringVar(&o.CPUAllocationOption, "cpu-allocation-option",
o.CPUAllocationOption, "The allocation option of cpu (packed/distributed). The default value is packed."+
"in cases where more than one NUMA node is required to satisfy the allocation.")
Expand All @@ -113,5 +116,6 @@ func (o *CPUOptions) ApplyTo(conf *qrmconfig.CPUQRMPluginConfig) error {
conf.EnableFullPhysicalCPUsOnly = o.EnableFullPhysicalCPUsOnly
conf.CPUAllocationOption = o.CPUAllocationOption
conf.CPUNUMAHintPreferPolicy = o.CPUNUMAHintPreferPolicy
conf.CPUNUMAHintPreferLowThreshold = o.CPUNUMAHintPreferLowThreshold
return nil
}
3 changes: 3 additions & 0 deletions pkg/agent/qrm-plugins/cpu/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,7 @@ const (
CPUNUMAHintPreferPolicyPacking = "packing"
// spreading: tries to distributing containers across multiple nodes. Aiming to balance the load by avoiding overloading individual nodes.
CPUNUMAHintPreferPolicySpreading = "spreading"
// dynamic_packing: refers to the strategy of putting as many containers as possible onto a single NUMA node until the node hits configurable threshold.
// if all nodes hit configurable threshold, use spreading policy instead.
CPUNUMAHintPreferPolicyDynamicPacking = "dynamic_packing"
)
2 changes: 2 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type DynamicPolicy struct {
podDebugAnnoKeys []string
transitionPeriod time.Duration
cpuNUMAHintPreferPolicy string
cpuNUMAHintPreferLowThreshold float64
}

func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
Expand Down Expand Up @@ -192,6 +193,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
cpuPluginSocketAbsPath: conf.CPUPluginSocketAbsPath,
enableCPUAdvisor: conf.CPUQRMPluginConfig.EnableCPUAdvisor,
cpuNUMAHintPreferPolicy: conf.CPUQRMPluginConfig.CPUNUMAHintPreferPolicy,
cpuNUMAHintPreferLowThreshold: conf.CPUQRMPluginConfig.CPUNUMAHintPreferLowThreshold,
reservedCPUs: reservedCPUs,
extraStateFileAbsPath: conf.ExtraStateFileAbsPath,
enableSyncingCPUIdle: conf.CPUQRMPluginConfig.EnableSyncingCPUIdle,
Expand Down
88 changes: 66 additions & 22 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,28 +284,8 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
return util.PackResourceHintsResponse(req, string(v1.ResourceCPU), hints)
}

func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, machineState state.NUMANodeMap,
reqAnnotations map[string]string) (map[string]*pluginapi.ListOfTopologyHints, error) {

numaNodes := machineState.GetFilteredNUMASetWithAnnotations(
state.CheckNUMABindingSharedCoresAntiAffinity, reqAnnotations).ToSliceInt()

hints := map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceCPU): {
Hints: []*pluginapi.TopologyHint{},
},
}

minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology)
if err != nil {
return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err)
}

// if a numa_binding shared_cores has request larger than 1 NUMA,
// its performance may degrade to be like normal shared_cores
if minNUMAsCountNeeded > 1 {
return nil, fmt.Errorf("numa_binding shared_cores container has request larger than 1 NUMA")
}
func (p *DynamicPolicy) populateHintsByPreferPolicy(numaNodes []int, preferPolicy string,
hints map[string]*pluginapi.ListOfTopologyHints, machineState state.NUMANodeMap, reqInt int) {

preferIndexes, maxLeft, minLeft := []int{}, -1, math.MaxInt

Expand Down Expand Up @@ -346,6 +326,70 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, mach
hints[string(v1.ResourceCPU)].Hints[preferIndex].Preferred = true
}
}
}

func (p *DynamicPolicy) filterNUMANodesByHintPreferLowThreshold(reqInt int,
machineState state.NUMANodeMap, numaNodes []int) []int {

filteredNUMANodes := make([]int, 0, len(numaNodes))

for _, nodeID := range numaNodes {
availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs)
allocatableCPUQuantity := machineState[nodeID].GetFilteredDefaultCPUSet(nil, nil).Difference(p.reservedCPUs).Size()

if allocatableCPUQuantity == 0 {
general.Warningf("numa: %d allocatable cpu quantity is zero", nodeID)
continue
}

if float64(availableCPUQuantity)/float64(allocatableCPUQuantity) >= p.cpuNUMAHintPreferLowThreshold {
filteredNUMANodes = append(filteredNUMANodes, nodeID)
}
}

return filteredNUMANodes
}

func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, machineState state.NUMANodeMap,
reqAnnotations map[string]string) (map[string]*pluginapi.ListOfTopologyHints, error) {

numaNodes := machineState.GetFilteredNUMASetWithAnnotations(
state.CheckNUMABindingSharedCoresAntiAffinity, reqAnnotations).ToSliceInt()

hints := map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceCPU): {
Hints: []*pluginapi.TopologyHint{},
},
}

minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology)
if err != nil {
return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err)
}

// if a numa_binding shared_cores has request larger than 1 NUMA,
// its performance may degrade to be like normal shared_cores
if minNUMAsCountNeeded > 1 {
return nil, fmt.Errorf("numa_binding shared_cores container has request larger than 1 NUMA")
}
switch p.cpuNUMAHintPreferPolicy {
case cpuconsts.CPUNUMAHintPreferPolicyPacking, cpuconsts.CPUNUMAHintPreferPolicySpreading:
general.Infof("apply %s policy on NUMAs: %+v", p.cpuNUMAHintPreferPolicy, numaNodes)
p.populateHintsByPreferPolicy(numaNodes, p.cpuNUMAHintPreferPolicy, hints, machineState, reqInt)
case cpuconsts.CPUNUMAHintPreferPolicyDynamicPacking:
compactNUMANodes := p.filterNUMANodesByHintPreferLowThreshold(reqInt, machineState, numaNodes)

if len(compactNUMANodes) > 0 {
general.Infof("dynamically apply packing policy on NUMAs: %+v", compactNUMANodes)
p.populateHintsByPreferPolicy(compactNUMANodes, cpuconsts.CPUNUMAHintPreferPolicyPacking, hints, machineState, reqInt)
} else {
general.Infof("empty compactNUMANodes, dynamically apply spreading policy on NUMAs: %+v", numaNodes)
p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicyPacking, hints, machineState, reqInt)
}
default:
general.Infof("unknown policy: %s, apply default spreading policy on NUMAs: %+v", p.cpuNUMAHintPreferPolicy, numaNodes)
p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, reqInt)
}

return hints, nil
}
Loading

0 comments on commit 17410b8

Please sign in to comment.