Skip to content

Commit

Permalink
support numa_binding enhancement for other than shared or dedicated qos
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Sep 12, 2024
1 parent d43eaca commit 95cedbc
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (p *CPUPressureLoadEviction) checkSharedPressureByPoolSize(pod2Pool PodPool
// accumulateSharedPoolsLimit calculates the cpu core limit used by shared core pool,
// and it equals: machine-core - cores-for-dedicated-pods - reserved-cores-reclaim-pods - reserved-cores-system-pods.
func (p *CPUPressureLoadEviction) accumulateSharedPoolsLimit() int {
availableCPUSet := p.state.GetMachineState().GetFilteredAvailableCPUSet(p.systemReservedCPUs, nil, state.CheckNUMABinding)
availableCPUSet := p.state.GetMachineState().GetFilteredAvailableCPUSet(p.systemReservedCPUs, nil, state.CheckSharedOrDedicatedNUMABinding)

coreNumReservedForReclaim := p.dynamicConf.GetDynamicConfiguration().MinReclaimedResourceForAllocate[v1.ResourceCPU]
if coreNumReservedForReclaim.Value() > int64(p.metaServer.NumCPUs) {
Expand All @@ -434,7 +434,7 @@ func (p *CPUPressureLoadEviction) accumulateSharedPoolsLimit() int {
reservedForReclaim := machine.GetCoreNumReservedForReclaim(int(coreNumReservedForReclaim.Value()), p.metaServer.NumNUMANodes)

reservedForReclaimInSharedNuma := 0
sharedCoresNUMAs := p.state.GetMachineState().GetFilteredNUMASet(state.CheckNUMABinding)
sharedCoresNUMAs := p.state.GetMachineState().GetFilteredNUMASet(state.CheckSharedOrDedicatedNUMABinding)
for _, numaID := range sharedCoresNUMAs.ToSliceInt() {
reservedForReclaimInSharedNuma += reservedForReclaim[numaID]
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
// pooledCPUs is the total available cpu cores minus those that are reserved
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
func(ai *state.AllocationInfo) bool {
return state.CheckDedicated(ai) || state.CheckNUMABinding(ai)
return state.CheckDedicated(ai) || state.CheckSharedNUMABinding(ai)
},
state.CheckDedicatedNUMABinding)
pooledCPUsTopologyAwareAssignments, err := machine.GetNumaAwareAssignments(p.machineInfo.CPUTopology, pooledCPUs)
Expand Down Expand Up @@ -1069,7 +1069,7 @@ func (p *DynamicPolicy) initReclaimPool() error {
machineState := p.state.GetMachineState()
availableCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
func(ai *state.AllocationInfo) bool {
return state.CheckDedicated(ai) || state.CheckNUMABinding(ai)
return state.CheckDedicated(ai) || state.CheckSharedNUMABinding(ai)
},
state.CheckDedicatedNUMABinding).Difference(noneResidentCPUs)

Expand Down Expand Up @@ -1184,7 +1184,7 @@ func (p *DynamicPolicy) checkNormalShareCoresCpuResource(req *pluginapi.Resource

machineState := p.state.GetMachineState()
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
state.CheckDedicated, state.CheckNUMABinding)
state.CheckDedicated, state.CheckSharedOrDedicatedNUMABinding)

general.Infof("[checkNormalShareCoresCpuResource] node cpu allocated: %d, allocatable: %d", shareCoresAllocatedInt, pooledCPUs.Size())
if shareCoresAllocatedInt > pooledCPUs.Size() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (p *DynamicPolicy) sharedCoresWithoutNUMABindingAllocationHandler(_ context

machineState := p.state.GetMachineState()
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
state.CheckDedicated, state.CheckNUMABinding)
state.CheckDedicated, state.CheckSharedOrDedicatedNUMABinding)

if pooledCPUs.IsEmpty() {
general.Errorf("pod: %s/%s, container: %s get empty pooledCPUs", req.PodNamespace, req.PodName, req.ContainerName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (p *DynamicPolicy) dedicatedCoresWithNUMABindingHintHandler(_ context.Conte

// if hints exists in extra state-file, prefer to use them
if hints == nil {
availableNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding)
availableNUMAs := machineState.GetFilteredNUMASet(state.CheckSharedOrDedicatedNUMABinding)

var extraErr error
hints, extraErr = util.GetHintsFromExtraStateFile(req.PodName, string(v1.ResourceCPU), p.extraStateFileAbsPath, availableNUMAs)
Expand Down Expand Up @@ -794,8 +794,8 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE
machineState state.NUMANodeMap,
req *pluginapi.ResourceRequest,
) (map[string]*pluginapi.ListOfTopologyHints, error) {
nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckNUMABinding).Size()
nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding)
nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckSharedOrDedicatedNUMABinding).Size()
nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckSharedOrDedicatedNUMABinding)
nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries, nil, p.getContainerRequestedCores)

reqAnnotations := req.Annotations
Expand Down
10 changes: 10 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,16 @@ func CheckSharedNUMABinding(ai *AllocationInfo) bool {
return CheckShared(ai) && CheckNUMABinding(ai)
}

// CheckSharedOrDedicatedNUMABinding returns true if the AllocationInfo is for pod with
// shared-qos or dedicated-qos and numa-binding enhancement
func CheckSharedOrDedicatedNUMABinding(ai *AllocationInfo) bool {
if ai == nil {
return false
}

return CheckSharedNUMABinding(ai) || CheckDedicatedNUMABinding(ai)
}

// CheckDedicatedPool returns true if the AllocationInfo is for a container in the dedicated pool
func CheckDedicatedPool(ai *AllocationInfo) bool {
if ai == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,10 @@ func GenerateMachineStateFromPodEntries(topology *machine.CPUTopology, podEntrie

switch policyName {
case consts.CPUResourcePluginPolicyNameDynamic:
// only modify allocated and default properties in NUMA node state if the policy is dynamic and the entry indicates numa_binding.
// only modify allocated and default properties in NUMA node state if the policy is dynamic and the dedicated qos entry indicates numa_binding.
// shared_cores with numa_binding also contributes to numaNodeState.AllocatedCPUSet,
// it's convenient that we can skip NUMA with AllocatedCPUSet > 0 when allocating CPUs for dedicated_cores with numa_binding.
if CheckNUMABinding(allocationInfo) {
if CheckSharedOrDedicatedNUMABinding(allocationInfo) {
allocatedCPUsInNumaNode = allocatedCPUsInNumaNode.Union(allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)])
}
case consts.CPUResourcePluginPolicyNameNative:
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ func (p *DynamicPolicy) checkNormalShareCoresResource(req *pluginapi.ResourceReq

machineState := p.state.GetMachineState()
resourceState := machineState[v1.ResourceMemory]
numaWithoutNUMABindingPods := resourceState.GetNUMANodesWithoutNUMABindingPods()
numaWithoutNUMABindingPods := resourceState.GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods()
numaAllocatableWithoutNUMABindingPods := uint64(0)
for _, numaID := range numaWithoutNUMABindingPods.ToSliceInt() {
numaAllocatableWithoutNUMABindingPods += resourceState[numaID].Allocatable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (p *DynamicPolicy) allocateNUMAsWithoutNUMABindingPods(_ context.Context,

machineState := p.state.GetMachineState()
resourceState := machineState[v1.ResourceMemory]
numaWithoutNUMABindingPods := resourceState.GetNUMANodesWithoutNUMABindingPods()
numaWithoutNUMABindingPods := resourceState.GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods()

allocationInfo := p.state.GetAllocationInfo(v1.ResourceMemory, req.PodUid, req.ContainerName)
if allocationInfo != nil {
Expand Down Expand Up @@ -672,7 +672,7 @@ func packAllocationResponse(allocationInfo *state.AllocationInfo, req *pluginapi
func (p *DynamicPolicy) adjustAllocationEntriesForSharedCores(numaSetChangedContainers map[string]map[string]*state.AllocationInfo,
podEntries state.PodEntries, machineState state.NUMANodeMap,
) {
numaWithoutNUMABindingPods := machineState.GetNUMANodesWithoutNUMABindingPods()
numaWithoutNUMABindingPods := machineState.GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods()
general.Infof("numaWithoutNUMABindingPods: %s", numaWithoutNUMABindingPods.String())

for podUID, containerEntries := range podEntries {
Expand Down Expand Up @@ -722,7 +722,7 @@ func (p *DynamicPolicy) adjustAllocationEntriesForSharedCores(numaSetChangedCont
func (p *DynamicPolicy) adjustAllocationEntriesForDedicatedCores(numaSetChangedContainers map[string]map[string]*state.AllocationInfo,
podEntries state.PodEntries, machineState state.NUMANodeMap,
) {
numaWithoutNUMABindingPods := machineState.GetNUMANodesWithoutNUMABindingPods()
numaWithoutNUMABindingPods := machineState.GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods()
general.Infof("numaWithoutNUMABindingPods: %s", numaWithoutNUMABindingPods.String())

for podUID, containerEntries := range podEntries {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (p *DynamicPolicy) checkMemorySet(_ *coreconfig.Configuration,
general.Warningf("skip memset checking for pod: %s/%s container: %s with zero memory request",
allocationInfo.PodNamespace, allocationInfo.PodName, containerName)
continue
} else if allocationInfo.CheckNumaBinding() {
} else if allocationInfo.CheckSharedOrDedicatedNUMABinding() {
unionNUMABindingStateMemorySet = unionNUMABindingStateMemorySet.Union(allocationInfo.NumaAllocationResult)
}

Expand Down Expand Up @@ -373,7 +373,7 @@ func (p *DynamicPolicy) checkMemorySet(_ *coreconfig.Configuration,
}

machineState := p.state.GetMachineState()[v1.ResourceMemory]
notAssignedMemSet := machineState.GetNUMANodesWithoutNUMABindingPods()
notAssignedMemSet := machineState.GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods()
if !unionNUMABindingStateMemorySet.Union(notAssignedMemSet).Equals(p.topology.CPUDetails.NUMANodes()) {
general.Infof("found node memset invalid. unionNUMABindingStateMemorySet: %s, notAssignedMemSet: %s, topology: %s",
unionNUMABindingStateMemorySet.String(), notAssignedMemSet.String(), p.topology.CPUDetails.NUMANodes().String())
Expand Down Expand Up @@ -476,7 +476,7 @@ func (p *DynamicPolicy) clearResidualState(_ *coreconfig.Configuration,
}

// setMemoryMigrate is used to calculate and set memory migrate configuration, notice that
// 1. not to set memory migrate for NUMA binding containers
// 1. not to set memory migrate for shared or dedicated NUMA binding containers
// 2. for a certain given pod/container, only one setting action is on the flight
// 3. the setting action is done asynchronously to avoid hang
func (p *DynamicPolicy) setMemoryMigrate() {
Expand All @@ -496,7 +496,7 @@ func (p *DynamicPolicy) setMemoryMigrate() {
} else if containerName == "" {
general.Errorf("pod: %s has empty containerName entry", podUID)
continue
} else if allocationInfo.CheckNumaBinding() {
} else if allocationInfo.CheckSharedOrDedicatedNUMABinding() {
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (p *DynamicPolicy) numaBindingHintHandler(_ context.Context,

// if hints exists in extra state-file, prefer to use them
if hints == nil {
availableNUMAs := resourcesMachineState[v1.ResourceMemory].GetNUMANodesWithoutNUMABindingPods()
availableNUMAs := resourcesMachineState[v1.ResourceMemory].GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods()

var extraErr error
hints, extraErr = util.GetHintsFromExtraStateFile(req.PodName, string(v1.ResourceMemory),
Expand Down
25 changes: 18 additions & 7 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ func (ai *AllocationInfo) CheckNumaExclusive() bool {
consts.PodAnnotationMemoryEnhancementNumaExclusiveEnable
}

// CheckSharedOrDedicatedNUMABinding returns true if the AllocationInfo is for pod with
// shared-qos or dedicated-qos and numa-binding enhancement
func (ai *AllocationInfo) CheckSharedOrDedicatedNUMABinding() bool {
if ai == nil {
return false
}

return (ai.QoSLevel == consts.PodAnnotationQoSLevelSharedCores && ai.CheckNumaBinding()) ||
(ai.QoSLevel == consts.PodAnnotationQoSLevelDedicatedCores && ai.CheckNumaBinding())
}

// CheckMainContainer returns true if the AllocationInfo is for main container
func (ai *AllocationInfo) CheckMainContainer() bool {
return ai.ContainerType == pluginapi.ContainerType_MAIN.String()
Expand Down Expand Up @@ -273,15 +284,15 @@ func (ns *NUMANodeState) Clone() *NUMANodeState {
}
}

// HasNUMABindingPods returns true if any AllocationInfo in this NUMANodeState is for numa-binding
func (ns *NUMANodeState) HasNUMABindingPods() bool {
// HasSharedOrDedicatedNUMABindingPods returns true if any AllocationInfo in this NUMANodeState is for shared or dedicated numa-binding
func (ns *NUMANodeState) HasSharedOrDedicatedNUMABindingPods() bool {
if ns == nil {
return false
}

for _, containerEntries := range ns.PodEntries {
for _, allocationInfo := range containerEntries {
if allocationInfo != nil && allocationInfo.CheckNumaBinding() {
if allocationInfo != nil && allocationInfo.CheckSharedOrDedicatedNUMABinding() {
return true
}
}
Expand Down Expand Up @@ -350,12 +361,12 @@ func (nm NUMANodeMap) BytesPerNUMA() (uint64, error) {
return 0, fmt.Errorf("getBytesPerNUMAFromMachineState doesn't get valid numaState")
}

// GetNUMANodesWithoutNUMABindingPods returns a set of numa nodes; for
// those numa nodes, they all don't contain numa-binding pods
func (nm NUMANodeMap) GetNUMANodesWithoutNUMABindingPods() machine.CPUSet {
// GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods returns a set of numa nodes; for
// those numa nodes, they all don't contain shared or dedicated numa-binding pods
func (nm NUMANodeMap) GetNUMANodesWithoutSharedOrDedicatedNUMABindingPods() machine.CPUSet {
res := machine.NewCPUSet()
for numaId, numaNodeState := range nm {
if numaNodeState != nil && !numaNodeState.HasNUMABindingPods() {
if numaNodeState != nil && !numaNodeState.HasSharedOrDedicatedNUMABindingPods() {
res = res.Union(machine.NewCPUSet(numaId))
}
}
Expand Down

0 comments on commit 95cedbc

Please sign in to comment.