Skip to content

Commit

Permalink
Merge pull request #680 from luomingmeng/dev/system-cores-support-cpu…
Browse files Browse the repository at this point in the history
…-set-pool-enhancement

system_cores support set specific pool
  • Loading branch information
luomingmeng authored Sep 26, 2024
2 parents 34749d8 + 098b202 commit f6623f4
Show file tree
Hide file tree
Showing 16 changed files with 758 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,10 @@ func (p *CPUPressureLoadEviction) collectMetrics(_ context.Context) {
for containerName, containerEntry := range entry {
if containerEntry == nil || containerEntry.IsPool {
continue
} else if containerEntry.OwnerPool == state.EmptyOwnerPoolName || p.skipPools.Has(p.configTranslator.Translate(containerEntry.OwnerPool)) {
} else if containerEntry.OwnerPool == state.EmptyOwnerPoolName ||
p.skipPools.Has(p.configTranslator.Translate(containerEntry.OwnerPool)) ||
// skip pod with system pool
state.IsSystemPool(containerEntry.OwnerPool) {
general.Infof("skip collecting metric for pod: %s, container: %s with owner pool name: %s",
podUID, containerName, containerEntry.OwnerPool)
continue
Expand Down Expand Up @@ -422,7 +425,7 @@ func (p *CPUPressureLoadEviction) checkSharedPressureByPoolSize(pod2Pool PodPool
// accumulateSharedPoolsLimit calculates the cpu core limit used by shared core pool,
// and it equals: machine-core - cores-for-dedicated-pods - reserved-cores-reclaim-pods - reserved-cores-system-pods.
func (p *CPUPressureLoadEviction) accumulateSharedPoolsLimit() int {
availableCPUSet := p.state.GetMachineState().GetFilteredAvailableCPUSet(p.systemReservedCPUs, nil, state.CheckNUMABinding)
availableCPUSet := p.state.GetMachineState().GetFilteredAvailableCPUSet(p.systemReservedCPUs, nil, state.CheckSharedOrDedicatedNUMABinding)

coreNumReservedForReclaim := p.dynamicConf.GetDynamicConfiguration().MinReclaimedResourceForAllocate[v1.ResourceCPU]
if coreNumReservedForReclaim.Value() > int64(p.metaServer.NumCPUs) {
Expand All @@ -431,7 +434,7 @@ func (p *CPUPressureLoadEviction) accumulateSharedPoolsLimit() int {
reservedForReclaim := machine.GetCoreNumReservedForReclaim(int(coreNumReservedForReclaim.Value()), p.metaServer.NumNUMANodes)

reservedForReclaimInSharedNuma := 0
sharedCoresNUMAs := p.state.GetMachineState().GetFilteredNUMASet(state.CheckNUMABinding)
sharedCoresNUMAs := p.state.GetMachineState().GetFilteredNUMASet(state.CheckSharedOrDedicatedNUMABinding)
for _, numaID := range sharedCoresNUMAs.ToSliceInt() {
reservedForReclaimInSharedNuma += reservedForReclaim[numaID]
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,15 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
consts.PodAnnotationQoSLevelSharedCores: policyImplement.sharedCoresAllocationHandler,
consts.PodAnnotationQoSLevelDedicatedCores: policyImplement.dedicatedCoresAllocationHandler,
consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresAllocationHandler,
consts.PodAnnotationQoSLevelSystemCores: policyImplement.systemCoresAllocationHandler,
}

// register hint providers for pods with different QoS level
policyImplement.hintHandlers = map[string]util.HintHandler{
consts.PodAnnotationQoSLevelSharedCores: policyImplement.sharedCoresHintHandler,
consts.PodAnnotationQoSLevelDedicatedCores: policyImplement.dedicatedCoresHintHandler,
consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresHintHandler,
consts.PodAnnotationQoSLevelSystemCores: policyImplement.systemCoresHintHandler,
}

if err := policyImplement.cleanPools(); err != nil {
Expand Down Expand Up @@ -426,7 +428,7 @@ func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
// pooledCPUs is the total available cpu cores minus those that are reserved
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
func(ai *state.AllocationInfo) bool {
return state.CheckDedicated(ai) || state.CheckNUMABinding(ai)
return state.CheckDedicated(ai) || state.CheckSharedNUMABinding(ai)
},
state.CheckDedicatedNUMABinding)
pooledCPUsTopologyAwareAssignments, err := machine.GetNumaAwareAssignments(p.machineInfo.CPUTopology, pooledCPUs)
Expand Down Expand Up @@ -1067,7 +1069,7 @@ func (p *DynamicPolicy) initReclaimPool() error {
machineState := p.state.GetMachineState()
availableCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
func(ai *state.AllocationInfo) bool {
return state.CheckDedicated(ai) || state.CheckNUMABinding(ai)
return state.CheckDedicated(ai) || state.CheckSharedNUMABinding(ai)
},
state.CheckDedicatedNUMABinding).Difference(noneResidentCPUs)

Expand Down Expand Up @@ -1182,7 +1184,7 @@ func (p *DynamicPolicy) checkNormalShareCoresCpuResource(req *pluginapi.Resource

machineState := p.state.GetMachineState()
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
state.CheckDedicated, state.CheckNUMABinding)
state.CheckDedicated, state.CheckSharedOrDedicatedNUMABinding)

general.Infof("[checkNormalShareCoresCpuResource] node cpu allocated: %d, allocatable: %d", shareCoresAllocatedInt, pooledCPUs.Size())
if shareCoresAllocatedInt > pooledCPUs.Size() {
Expand Down
13 changes: 13 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_advisor_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,19 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
general.Errorf(errMsg)
return fmt.Errorf(errMsg)
case consts.PodAnnotationQoSLevelSystemCores:
poolCPUSet, topologyAwareAssignments, err := p.getSystemPoolCPUSetAndNumaAwareAssignments(newEntries, allocationInfo)
if err != nil {
return fmt.Errorf("pod: %s/%s, container: %s is system_cores, "+
"getSystemPoolCPUSetAndNumaAwareAssignments failed with error: %v",
allocationInfo.PodNamespace, allocationInfo.PodName,
allocationInfo.ContainerName, err)
}

newEntries[podUID][containerName].AllocationResult = poolCPUSet
newEntries[podUID][containerName].OriginalAllocationResult = poolCPUSet.Clone()
newEntries[podUID][containerName].TopologyAwareAssignments = topologyAwareAssignments
newEntries[podUID][containerName].OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(topologyAwareAssignments)
case consts.PodAnnotationQoSLevelSharedCores, consts.PodAnnotationQoSLevelReclaimedCores:
ownerPoolName := allocationInfo.GetOwnerPoolName()
if calculationInfo, ok := resp.GetCalculationInfo(podUID, containerName); ok {
Expand Down
149 changes: 145 additions & 4 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_allocation_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sort"
"strings"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (p *DynamicPolicy) sharedCoresWithoutNUMABindingAllocationHandler(_ context

machineState := p.state.GetMachineState()
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
state.CheckDedicated, state.CheckNUMABinding)
state.CheckDedicated, state.CheckSharedOrDedicatedNUMABinding)

if pooledCPUs.IsEmpty() {
general.Errorf("pod: %s/%s, container: %s get empty pooledCPUs", req.PodNamespace, req.PodName, req.ContainerName)
Expand Down Expand Up @@ -1021,7 +1022,7 @@ func (p *DynamicPolicy) applyPoolsAndIsolatedInfo(poolsCPUSet map[string]machine

// 2. construct entries for all pools
if poolsCPUSet[state.PoolNameReclaim].IsEmpty() {
return fmt.Errorf("entry: %s is empty", state.PoolNameShare)
return fmt.Errorf("entry: %s is empty", state.PoolNameReclaim)
}

for poolName, cset := range poolsCPUSet {
Expand Down Expand Up @@ -1116,16 +1117,30 @@ func (p *DynamicPolicy) applyPoolsAndIsolatedInfo(poolsCPUSet map[string]machine
newPodEntries[podUID][containerName].TopologyAwareAssignments = machine.DeepcopyCPUAssignment(rampUpCPUsTopologyAwareAssignments)
newPodEntries[podUID][containerName].OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(rampUpCPUsTopologyAwareAssignments)

case apiconsts.PodAnnotationQoSLevelSystemCores:
poolCPUSet, topologyAwareAssignments, err := p.getSystemPoolCPUSetAndNumaAwareAssignments(newPodEntries, allocationInfo)
if err != nil {
return fmt.Errorf("pod: %s/%s, container: %s is system_cores, "+
"getSystemPoolCPUSetAndNumaAwareAssignments failed with error: %v",
allocationInfo.PodNamespace, allocationInfo.PodName,
allocationInfo.ContainerName, err)
}

newPodEntries[podUID][containerName].AllocationResult = poolCPUSet
newPodEntries[podUID][containerName].OriginalAllocationResult = poolCPUSet.Clone()
newPodEntries[podUID][containerName].TopologyAwareAssignments = topologyAwareAssignments
newPodEntries[podUID][containerName].OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(topologyAwareAssignments)

case apiconsts.PodAnnotationQoSLevelSharedCores, apiconsts.PodAnnotationQoSLevelReclaimedCores:
var ownerPoolName string
if state.CheckSharedNUMABinding(allocationInfo) {
ownerPoolName = allocationInfo.GetOwnerPoolName()

if ownerPoolName == state.EmptyOwnerPoolName {
var err error
// why do we itegrate GetOwnerPoolName + GetSpecifiedNUMABindingPoolName into GetPoolName for SharedNUMABinding containers?
// why do we integrate GetOwnerPoolName + GetSpecifiedNUMABindingPoolName into GetPoolName for SharedNUMABinding containers?
// it's because we reply on GetSpecifiedPoolName (in GetPoolName) when calling CheckNUMABindingSharedCoresAntiAffinity,
// At that time, NUMA hint for the candicate container isn't confirmed, so we can't implement NUMA hint aware logic in GetSpecifiedPoolName.
// At that time, NUMA hint for the candidate container isn't confirmed, so we can't implement NUMA hint aware logic in GetSpecifiedPoolName.
ownerPoolName, err = allocationInfo.GetSpecifiedNUMABindingPoolName()
if err != nil {
return fmt.Errorf("pod: %s/%s, container: %s is shared_cores with numa_binding, "+
Expand Down Expand Up @@ -1744,3 +1759,129 @@ func (p *DynamicPolicy) getReclaimOverlapShareRatio(entries state.PodEntries) (m

return reclaimOverlapShareRatio, nil
}

func (p *DynamicPolicy) systemCoresHintHandler(_ context.Context, request *pluginapi.ResourceRequest) (*pluginapi.ResourceHintsResponse, error) {
return util.PackResourceHintsResponse(request, string(v1.ResourceCPU),
map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceCPU): nil, // indicates that there is no numa preference
})
}

func (p *DynamicPolicy) systemCoresAllocationHandler(ctx context.Context, req *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) {
if req.ContainerType == pluginapi.ContainerType_SIDECAR {
return p.allocationSidecarHandler(ctx, req, apiconsts.PodAnnotationQoSLevelSystemCores)
}

allocationInfo := &state.AllocationInfo{
PodUid: req.PodUid,
PodNamespace: req.PodNamespace,
PodName: req.PodName,
ContainerName: req.ContainerName,
ContainerType: req.ContainerType.String(),
ContainerIndex: req.ContainerIndex,
OwnerPoolName: state.EmptyOwnerPoolName,
PodRole: req.PodRole,
PodType: req.PodType,
InitTimestamp: time.Now().Format(util.QRMTimeFormat),
Labels: general.DeepCopyMap(req.Labels),
Annotations: general.DeepCopyMap(req.Annotations),
QoSLevel: apiconsts.PodAnnotationQoSLevelSystemCores,
}

poolCPUSet, topologyAwareAssignments, err := p.getSystemPoolCPUSetAndNumaAwareAssignments(p.state.GetPodEntries(), allocationInfo)
if err != nil {
general.ErrorS(err, "unable to get system pool cpuset and topologyAwareAssignments",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerName", req.ContainerName)
return nil, err
}

systemPoolName, err := allocationInfo.GetSpecifiedSystemPoolName()
if err != nil {
return nil, err
}

general.InfoS("allocate system pool cpuset successfully",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerName", req.ContainerName,
"poolName", systemPoolName,
"result", poolCPUSet.String(),
"topologyAwareAssignments", topologyAwareAssignments)

allocationInfo.OwnerPoolName = systemPoolName
allocationInfo.AllocationResult = poolCPUSet
allocationInfo.OriginalAllocationResult = poolCPUSet.Clone()
allocationInfo.TopologyAwareAssignments = topologyAwareAssignments
allocationInfo.OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(topologyAwareAssignments)

p.state.SetAllocationInfo(allocationInfo.PodUid, allocationInfo.ContainerName, allocationInfo)
podEntries := p.state.GetPodEntries()

updatedMachineState, err := generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
if err != nil {
general.Errorf("pod: %s/%s, container: %s generateMachineStateFromPodEntries failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
return nil, fmt.Errorf("generateMachineStateFromPodEntries failed with error: %v", err)
}
p.state.SetMachineState(updatedMachineState)

resp, err := cpuutil.PackAllocationResponse(allocationInfo, string(v1.ResourceCPU), util.OCIPropertyNameCPUSetCPUs, false, true, req)
if err != nil {
general.Errorf("pod: %s/%s, container: %s PackResourceAllocationResponseByAllocationInfo failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
return nil, fmt.Errorf("PackResourceAllocationResponseByAllocationInfo failed with error: %v", err)
}
return resp, nil
}

func (p *DynamicPolicy) getSystemPoolCPUSetAndNumaAwareAssignments(podEntries state.PodEntries,
allocationInfo *state.AllocationInfo,
) (machine.CPUSet, map[int]machine.CPUSet, error) {
if allocationInfo == nil {
return machine.CPUSet{}, nil, fmt.Errorf("allocationInfo is nil")
}

poolCPUSet := machine.NewCPUSet()
specifiedPoolName := allocationInfo.GetSpecifiedPoolName()
if specifiedPoolName != state.EmptyOwnerPoolName {
for pool, entries := range podEntries {
if !entries.IsPoolEntry() {
continue
}

if pool == specifiedPoolName || strings.HasPrefix(pool, specifiedPoolName) {
poolCPUSet = poolCPUSet.Union(entries.GetPoolEntry().AllocationResult)
general.Infof("pod: %s/%s, container: %s get system pool cpuset from pool: %s, cpuset: %s", allocationInfo.PodNamespace, allocationInfo.PodName,
allocationInfo.ContainerName, pool, entries.GetPoolEntry().AllocationResult.String())
}
}
}

// if pool set is empty, try to get default cpuset
if poolCPUSet.IsEmpty() {
// if the pod is numa binding, get the default cpuset from machine state
if state.CheckNUMABinding(allocationInfo) {
poolCPUSet = p.state.GetMachineState().GetAvailableCPUSet(p.reservedCPUs)
}

// if the default cpuset is empty or no numa binding, use all cpuset as default cpuset
if poolCPUSet.IsEmpty() {
poolCPUSet = p.machineInfo.CPUDetails.CPUs()
}
general.Infof("pod: %s/%s, container: %s get system pool cpuset from default cpuset: %s", allocationInfo.PodNamespace, allocationInfo.PodName,
allocationInfo.ContainerName, poolCPUSet.String())
}

if poolCPUSet.IsEmpty() {
return machine.CPUSet{}, nil, fmt.Errorf("no system pool cpuset for pool %s", specifiedPoolName)
}

topologyAwareAssignments, err := machine.GetNumaAwareAssignments(p.machineInfo.CPUTopology, poolCPUSet)
if err != nil {
return machine.CPUSet{}, nil, fmt.Errorf("unable to get numa aware assignments: %v", err)
}

return poolCPUSet, topologyAwareAssignments, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (p *DynamicPolicy) dedicatedCoresWithNUMABindingHintHandler(_ context.Conte

// if hints exists in extra state-file, prefer to use them
if hints == nil {
availableNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding)
availableNUMAs := machineState.GetFilteredNUMASet(state.CheckSharedOrDedicatedNUMABinding)

var extraErr error
hints, extraErr = util.GetHintsFromExtraStateFile(req.PodName, string(v1.ResourceCPU), p.extraStateFileAbsPath, availableNUMAs)
Expand Down Expand Up @@ -794,8 +794,8 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE
machineState state.NUMANodeMap,
req *pluginapi.ResourceRequest,
) (map[string]*pluginapi.ListOfTopologyHints, error) {
nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckNUMABinding).Size()
nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding)
nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckSharedOrDedicatedNUMABinding).Size()
nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckSharedOrDedicatedNUMABinding)
nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries, nil, p.getContainerRequestedCores)

reqAnnotations := req.Annotations
Expand Down
Loading

0 comments on commit f6623f4

Please sign in to comment.