Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

system_cores support set specific pool #680

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,10 @@ func (p *CPUPressureLoadEviction) collectMetrics(_ context.Context) {
for containerName, containerEntry := range entry {
if containerEntry == nil || containerEntry.IsPool {
continue
} else if containerEntry.OwnerPool == state.EmptyOwnerPoolName || p.skipPools.Has(p.configTranslator.Translate(containerEntry.OwnerPool)) {
} else if containerEntry.OwnerPool == state.EmptyOwnerPoolName ||
p.skipPools.Has(p.configTranslator.Translate(containerEntry.OwnerPool)) ||
// skip pod with system pool
state.IsSystemPool(containerEntry.OwnerPool) {
general.Infof("skip collecting metric for pod: %s, container: %s with owner pool name: %s",
podUID, containerName, containerEntry.OwnerPool)
continue
Expand Down Expand Up @@ -422,7 +425,7 @@ func (p *CPUPressureLoadEviction) checkSharedPressureByPoolSize(pod2Pool PodPool
// accumulateSharedPoolsLimit calculates the cpu core limit used by shared core pool,
// and it equals: machine-core - cores-for-dedicated-pods - reserved-cores-reclaim-pods - reserved-cores-system-pods.
func (p *CPUPressureLoadEviction) accumulateSharedPoolsLimit() int {
availableCPUSet := p.state.GetMachineState().GetFilteredAvailableCPUSet(p.systemReservedCPUs, nil, state.CheckNUMABinding)
availableCPUSet := p.state.GetMachineState().GetFilteredAvailableCPUSet(p.systemReservedCPUs, nil, state.CheckSharedOrDedicatedNUMABinding)

coreNumReservedForReclaim := p.dynamicConf.GetDynamicConfiguration().MinReclaimedResourceForAllocate[v1.ResourceCPU]
if coreNumReservedForReclaim.Value() > int64(p.metaServer.NumCPUs) {
Expand All @@ -431,7 +434,7 @@ func (p *CPUPressureLoadEviction) accumulateSharedPoolsLimit() int {
reservedForReclaim := machine.GetCoreNumReservedForReclaim(int(coreNumReservedForReclaim.Value()), p.metaServer.NumNUMANodes)

reservedForReclaimInSharedNuma := 0
sharedCoresNUMAs := p.state.GetMachineState().GetFilteredNUMASet(state.CheckNUMABinding)
sharedCoresNUMAs := p.state.GetMachineState().GetFilteredNUMASet(state.CheckSharedOrDedicatedNUMABinding)
for _, numaID := range sharedCoresNUMAs.ToSliceInt() {
reservedForReclaimInSharedNuma += reservedForReclaim[numaID]
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,15 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
consts.PodAnnotationQoSLevelSharedCores: policyImplement.sharedCoresAllocationHandler,
consts.PodAnnotationQoSLevelDedicatedCores: policyImplement.dedicatedCoresAllocationHandler,
consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresAllocationHandler,
consts.PodAnnotationQoSLevelSystemCores: policyImplement.systemCoresAllocationHandler,
}

// register hint providers for pods with different QoS level
policyImplement.hintHandlers = map[string]util.HintHandler{
consts.PodAnnotationQoSLevelSharedCores: policyImplement.sharedCoresHintHandler,
consts.PodAnnotationQoSLevelDedicatedCores: policyImplement.dedicatedCoresHintHandler,
consts.PodAnnotationQoSLevelReclaimedCores: policyImplement.reclaimedCoresHintHandler,
consts.PodAnnotationQoSLevelSystemCores: policyImplement.systemCoresHintHandler,
}

if err := policyImplement.cleanPools(); err != nil {
Expand Down Expand Up @@ -426,7 +428,7 @@ func (p *DynamicPolicy) GetResourcesAllocation(_ context.Context,
// pooledCPUs is the total available cpu cores minus those that are reserved
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
func(ai *state.AllocationInfo) bool {
return state.CheckDedicated(ai) || state.CheckNUMABinding(ai)
return state.CheckDedicated(ai) || state.CheckSharedNUMABinding(ai)
},
state.CheckDedicatedNUMABinding)
pooledCPUsTopologyAwareAssignments, err := machine.GetNumaAwareAssignments(p.machineInfo.CPUTopology, pooledCPUs)
Expand Down Expand Up @@ -1067,7 +1069,7 @@ func (p *DynamicPolicy) initReclaimPool() error {
machineState := p.state.GetMachineState()
availableCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
func(ai *state.AllocationInfo) bool {
return state.CheckDedicated(ai) || state.CheckNUMABinding(ai)
return state.CheckDedicated(ai) || state.CheckSharedNUMABinding(ai)
},
state.CheckDedicatedNUMABinding).Difference(noneResidentCPUs)

Expand Down Expand Up @@ -1182,7 +1184,7 @@ func (p *DynamicPolicy) checkNormalShareCoresCpuResource(req *pluginapi.Resource

machineState := p.state.GetMachineState()
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
state.CheckDedicated, state.CheckNUMABinding)
state.CheckDedicated, state.CheckSharedOrDedicatedNUMABinding)

general.Infof("[checkNormalShareCoresCpuResource] node cpu allocated: %d, allocatable: %d", shareCoresAllocatedInt, pooledCPUs.Size())
if shareCoresAllocatedInt > pooledCPUs.Size() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,19 @@ func (p *DynamicPolicy) applyBlocks(blockCPUSet advisorapi.BlockCPUSet, resp *ad
allocationInfo.PodNamespace, allocationInfo.PodName, allocationInfo.ContainerName)
general.Errorf(errMsg)
return fmt.Errorf(errMsg)
case consts.PodAnnotationQoSLevelSystemCores:
poolCPUSet, topologyAwareAssignments, err := p.getSystemPoolCPUSetAndNumaAwareAssignments(newEntries, allocationInfo)
if err != nil {
return fmt.Errorf("pod: %s/%s, container: %s is system_cores, "+
"getSystemPoolCPUSetAndNumaAwareAssignments failed with error: %v",
allocationInfo.PodNamespace, allocationInfo.PodName,
allocationInfo.ContainerName, err)
}

newEntries[podUID][containerName].AllocationResult = poolCPUSet
newEntries[podUID][containerName].OriginalAllocationResult = poolCPUSet.Clone()
newEntries[podUID][containerName].TopologyAwareAssignments = topologyAwareAssignments
newEntries[podUID][containerName].OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(topologyAwareAssignments)
case consts.PodAnnotationQoSLevelSharedCores, consts.PodAnnotationQoSLevelReclaimedCores:
ownerPoolName := allocationInfo.GetOwnerPoolName()
if calculationInfo, ok := resp.GetCalculationInfo(podUID, containerName); ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sort"
"strings"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (p *DynamicPolicy) sharedCoresWithoutNUMABindingAllocationHandler(_ context

machineState := p.state.GetMachineState()
pooledCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs,
state.CheckDedicated, state.CheckNUMABinding)
state.CheckDedicated, state.CheckSharedOrDedicatedNUMABinding)

if pooledCPUs.IsEmpty() {
general.Errorf("pod: %s/%s, container: %s get empty pooledCPUs", req.PodNamespace, req.PodName, req.ContainerName)
Expand Down Expand Up @@ -1021,7 +1022,7 @@ func (p *DynamicPolicy) applyPoolsAndIsolatedInfo(poolsCPUSet map[string]machine

// 2. construct entries for all pools
if poolsCPUSet[state.PoolNameReclaim].IsEmpty() {
return fmt.Errorf("entry: %s is empty", state.PoolNameShare)
return fmt.Errorf("entry: %s is empty", state.PoolNameReclaim)
}

for poolName, cset := range poolsCPUSet {
Expand Down Expand Up @@ -1116,16 +1117,30 @@ func (p *DynamicPolicy) applyPoolsAndIsolatedInfo(poolsCPUSet map[string]machine
newPodEntries[podUID][containerName].TopologyAwareAssignments = machine.DeepcopyCPUAssignment(rampUpCPUsTopologyAwareAssignments)
newPodEntries[podUID][containerName].OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(rampUpCPUsTopologyAwareAssignments)

case apiconsts.PodAnnotationQoSLevelSystemCores:
poolCPUSet, topologyAwareAssignments, err := p.getSystemPoolCPUSetAndNumaAwareAssignments(newPodEntries, allocationInfo)
if err != nil {
return fmt.Errorf("pod: %s/%s, container: %s is system_cores, "+
"getSystemPoolCPUSetAndNumaAwareAssignments failed with error: %v",
allocationInfo.PodNamespace, allocationInfo.PodName,
allocationInfo.ContainerName, err)
}

newPodEntries[podUID][containerName].AllocationResult = poolCPUSet
newPodEntries[podUID][containerName].OriginalAllocationResult = poolCPUSet.Clone()
newPodEntries[podUID][containerName].TopologyAwareAssignments = topologyAwareAssignments
newPodEntries[podUID][containerName].OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(topologyAwareAssignments)

case apiconsts.PodAnnotationQoSLevelSharedCores, apiconsts.PodAnnotationQoSLevelReclaimedCores:
var ownerPoolName string
if state.CheckSharedNUMABinding(allocationInfo) {
ownerPoolName = allocationInfo.GetOwnerPoolName()

if ownerPoolName == state.EmptyOwnerPoolName {
var err error
// why do we itegrate GetOwnerPoolName + GetSpecifiedNUMABindingPoolName into GetPoolName for SharedNUMABinding containers?
// why do we integrate GetOwnerPoolName + GetSpecifiedNUMABindingPoolName into GetPoolName for SharedNUMABinding containers?
// it's because we reply on GetSpecifiedPoolName (in GetPoolName) when calling CheckNUMABindingSharedCoresAntiAffinity,
// At that time, NUMA hint for the candicate container isn't confirmed, so we can't implement NUMA hint aware logic in GetSpecifiedPoolName.
// At that time, NUMA hint for the candidate container isn't confirmed, so we can't implement NUMA hint aware logic in GetSpecifiedPoolName.
ownerPoolName, err = allocationInfo.GetSpecifiedNUMABindingPoolName()
if err != nil {
return fmt.Errorf("pod: %s/%s, container: %s is shared_cores with numa_binding, "+
Expand Down Expand Up @@ -1744,3 +1759,129 @@ func (p *DynamicPolicy) getReclaimOverlapShareRatio(entries state.PodEntries) (m

return reclaimOverlapShareRatio, nil
}

func (p *DynamicPolicy) systemCoresHintHandler(_ context.Context, request *pluginapi.ResourceRequest) (*pluginapi.ResourceHintsResponse, error) {
return util.PackResourceHintsResponse(request, string(v1.ResourceCPU),
map[string]*pluginapi.ListOfTopologyHints{
string(v1.ResourceCPU): nil, // indicates that there is no numa preference
})
}

func (p *DynamicPolicy) systemCoresAllocationHandler(ctx context.Context, req *pluginapi.ResourceRequest) (*pluginapi.ResourceAllocationResponse, error) {
if req.ContainerType == pluginapi.ContainerType_SIDECAR {
return p.allocationSidecarHandler(ctx, req, apiconsts.PodAnnotationQoSLevelSystemCores)
}

allocationInfo := &state.AllocationInfo{
PodUid: req.PodUid,
PodNamespace: req.PodNamespace,
PodName: req.PodName,
ContainerName: req.ContainerName,
ContainerType: req.ContainerType.String(),
ContainerIndex: req.ContainerIndex,
OwnerPoolName: state.EmptyOwnerPoolName,
PodRole: req.PodRole,
PodType: req.PodType,
InitTimestamp: time.Now().Format(util.QRMTimeFormat),
Labels: general.DeepCopyMap(req.Labels),
Annotations: general.DeepCopyMap(req.Annotations),
QoSLevel: apiconsts.PodAnnotationQoSLevelSystemCores,
}

poolCPUSet, topologyAwareAssignments, err := p.getSystemPoolCPUSetAndNumaAwareAssignments(p.state.GetPodEntries(), allocationInfo)
if err != nil {
general.ErrorS(err, "unable to get system pool cpuset and topologyAwareAssignments",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerName", req.ContainerName)
return nil, err
}

systemPoolName, err := allocationInfo.GetSpecifiedSystemPoolName()
if err != nil {
return nil, err
}

general.InfoS("allocate system pool cpuset successfully",
"podNamespace", req.PodNamespace,
"podName", req.PodName,
"containerName", req.ContainerName,
"poolName", systemPoolName,
"result", poolCPUSet.String(),
"topologyAwareAssignments", topologyAwareAssignments)

allocationInfo.OwnerPoolName = systemPoolName
allocationInfo.AllocationResult = poolCPUSet
allocationInfo.OriginalAllocationResult = poolCPUSet.Clone()
allocationInfo.TopologyAwareAssignments = topologyAwareAssignments
allocationInfo.OriginalTopologyAwareAssignments = machine.DeepcopyCPUAssignment(topologyAwareAssignments)

p.state.SetAllocationInfo(allocationInfo.PodUid, allocationInfo.ContainerName, allocationInfo)
podEntries := p.state.GetPodEntries()

updatedMachineState, err := generateMachineStateFromPodEntries(p.machineInfo.CPUTopology, podEntries)
if err != nil {
general.Errorf("pod: %s/%s, container: %s generateMachineStateFromPodEntries failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
return nil, fmt.Errorf("generateMachineStateFromPodEntries failed with error: %v", err)
}
p.state.SetMachineState(updatedMachineState)

resp, err := cpuutil.PackAllocationResponse(allocationInfo, string(v1.ResourceCPU), util.OCIPropertyNameCPUSetCPUs, false, true, req)
if err != nil {
general.Errorf("pod: %s/%s, container: %s PackResourceAllocationResponseByAllocationInfo failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
return nil, fmt.Errorf("PackResourceAllocationResponseByAllocationInfo failed with error: %v", err)
}
return resp, nil
}

func (p *DynamicPolicy) getSystemPoolCPUSetAndNumaAwareAssignments(podEntries state.PodEntries,
allocationInfo *state.AllocationInfo,
) (machine.CPUSet, map[int]machine.CPUSet, error) {
if allocationInfo == nil {
return machine.CPUSet{}, nil, fmt.Errorf("allocationInfo is nil")
}

poolCPUSet := machine.NewCPUSet()
specifiedPoolName := allocationInfo.GetSpecifiedPoolName()
if specifiedPoolName != state.EmptyOwnerPoolName {
for pool, entries := range podEntries {
if !entries.IsPoolEntry() {
continue
}

if pool == specifiedPoolName || strings.HasPrefix(pool, specifiedPoolName) {
poolCPUSet = poolCPUSet.Union(entries.GetPoolEntry().AllocationResult)
general.Infof("pod: %s/%s, container: %s get system pool cpuset from pool: %s, cpuset: %s", allocationInfo.PodNamespace, allocationInfo.PodName,
allocationInfo.ContainerName, pool, entries.GetPoolEntry().AllocationResult.String())
}
}
}

// if pool set is empty, try to get default cpuset
if poolCPUSet.IsEmpty() {
// if the pod is numa binding, get the default cpuset from machine state
if state.CheckNUMABinding(allocationInfo) {
poolCPUSet = p.state.GetMachineState().GetAvailableCPUSet(p.reservedCPUs)
}

// if the default cpuset is empty or no numa binding, use all cpuset as default cpuset
if poolCPUSet.IsEmpty() {
poolCPUSet = p.machineInfo.CPUDetails.CPUs()
}
general.Infof("pod: %s/%s, container: %s get system pool cpuset from default cpuset: %s", allocationInfo.PodNamespace, allocationInfo.PodName,
allocationInfo.ContainerName, poolCPUSet.String())
}

if poolCPUSet.IsEmpty() {
return machine.CPUSet{}, nil, fmt.Errorf("no system pool cpuset for pool %s", specifiedPoolName)
}

topologyAwareAssignments, err := machine.GetNumaAwareAssignments(p.machineInfo.CPUTopology, poolCPUSet)
if err != nil {
return machine.CPUSet{}, nil, fmt.Errorf("unable to get numa aware assignments: %v", err)
}

return poolCPUSet, topologyAwareAssignments, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (p *DynamicPolicy) dedicatedCoresWithNUMABindingHintHandler(_ context.Conte

// if hints exists in extra state-file, prefer to use them
if hints == nil {
availableNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding)
availableNUMAs := machineState.GetFilteredNUMASet(state.CheckSharedOrDedicatedNUMABinding)

var extraErr error
hints, extraErr = util.GetHintsFromExtraStateFile(req.PodName, string(v1.ResourceCPU), p.extraStateFileAbsPath, availableNUMAs)
Expand Down Expand Up @@ -794,8 +794,8 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE
machineState state.NUMANodeMap,
req *pluginapi.ResourceRequest,
) (map[string]*pluginapi.ListOfTopologyHints, error) {
nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckNUMABinding).Size()
nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckNUMABinding)
nonBindingNUMAsCPUQuantity := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckSharedOrDedicatedNUMABinding).Size()
nonBindingNUMAs := machineState.GetFilteredNUMASet(state.CheckSharedOrDedicatedNUMABinding)
nonBindingSharedRequestedQuantity := state.GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries, nil, p.getContainerRequestedCores)

reqAnnotations := req.Annotations
Expand Down
Loading
Loading