Skip to content

Commit

Permalink
feat(sysadvisor): support share cores with NUMA binding
Browse files Browse the repository at this point in the history
  • Loading branch information
nightmeng committed May 6, 2024
1 parent 0b45afa commit 004c298
Show file tree
Hide file tree
Showing 25 changed files with 880 additions and 124 deletions.
16 changes: 12 additions & 4 deletions cmd/katalyst-agent/app/options/sysadvisor/sysadvisor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,19 @@ import (

// GenericSysAdvisorOptions holds the configurations for sysadvisor
type GenericSysAdvisorOptions struct {
SysAdvisorPlugins []string
StateFileDirectory string
SysAdvisorPlugins []string
StateFileDirectory string
ClearStateFileDirectory bool
DisableShareCoresNumaBinding bool
}

// NewGenericSysAdvisorOptions creates a new Options with a default config.
func NewGenericSysAdvisorOptions() *GenericSysAdvisorOptions {
return &GenericSysAdvisorOptions{
SysAdvisorPlugins: []string{"*"},
StateFileDirectory: "/var/lib/katalyst/sys_advisor/",
SysAdvisorPlugins: []string{"*"},
StateFileDirectory: "/var/lib/katalyst/sys_advisor/",
ClearStateFileDirectory: false,
DisableShareCoresNumaBinding: false,
}
}

Expand All @@ -52,12 +56,16 @@ func (o *GenericSysAdvisorOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"A list of sysadvisor plugins to enable. '*' enables all on-by-default sysadvisor plugins, 'foo' enables the sysadvisor plugin "+
"named 'foo', '-foo' disables the sysadvisor plugin named 'foo'"))
fs.StringVar(&o.StateFileDirectory, "state-dir", o.StateFileDirectory, "directory for sys advisor to store state file")
fs.BoolVar(&o.ClearStateFileDirectory, "clear-state-dir", o.ClearStateFileDirectory, "clear state file when starting up (only for rollback)")
fs.BoolVar(&o.DisableShareCoresNumaBinding, "disable-share-cores-numa-binding", o.DisableShareCoresNumaBinding, "disable share cores with NUMA binding feature")
}

// ApplyTo fills up config with options
func (o *GenericSysAdvisorOptions) ApplyTo(c *sysadvisor.GenericSysAdvisorConfiguration) error {
c.SysAdvisorPlugins = o.SysAdvisorPlugins
c.StateFileDirectory = o.StateFileDirectory
c.ClearStateFileDirectory = o.ClearStateFileDirectory
c.DisableShareCoresNumaBinding = o.DisableShareCoresNumaBinding
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,21 @@ type CPUPressureLoadEviction struct {
poolMetricCollectHandlers map[string]PoolMetricCollectHandler

systemReservedCPUs machine.CPUSet
configTranslator *general.CommonSuffixTranslator
}

func NewCPUPressureLoadEviction(emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer,
conf *config.Configuration, state state.ReadonlyState) (CPUPressureEviction, error) {
plugin := &CPUPressureLoadEviction{
state: state,
emitter: emitter,
metaServer: metaServer,
metricsHistory: make(map[string]Entries),
qosConf: conf.QoSConfiguration,
dynamicConf: conf.DynamicAgentConfiguration,
skipPools: sets.NewString(conf.LoadPressureEvictionSkipPools...),
syncPeriod: conf.LoadEvictionSyncPeriod,
state: state,
emitter: emitter,
metaServer: metaServer,
metricsHistory: make(map[string]Entries),
qosConf: conf.QoSConfiguration,
dynamicConf: conf.DynamicAgentConfiguration,
skipPools: sets.NewString(conf.LoadPressureEvictionSkipPools...),
syncPeriod: conf.LoadEvictionSyncPeriod,
configTranslator: general.NewCommonSuffixTranslator("-NUMA"),
}

systemReservedCores, reserveErr := cpuutil.GetCoresReservedForSystem(conf, metaServer, metaServer.KatalystMachineInfo, metaServer.CPUDetails.CPUs().Clone())
Expand Down Expand Up @@ -151,7 +153,7 @@ func (p *CPUPressureLoadEviction) ThresholdMet(_ context.Context,

var softThresholdMetPoolName string
for poolName, entries := range p.metricsHistory[consts.MetricLoad1MinContainer] {
if !entries.IsPoolEntry() || p.skipPools.Has(poolName) || state.IsIsolationPool(poolName) {
if !entries.IsPoolEntry() || p.skipPools.Has(p.configTranslator.Translate(poolName)) || state.IsIsolationPool(poolName) {
continue
}

Expand Down Expand Up @@ -333,7 +335,7 @@ func (p *CPUPressureLoadEviction) collectMetrics(_ context.Context) {
for containerName, containerEntry := range entry {
if containerEntry == nil || containerEntry.IsPool {
continue
} else if containerEntry.OwnerPool == advisorapi.EmptyOwnerPoolName || p.skipPools.Has(containerEntry.OwnerPool) {
} else if containerEntry.OwnerPool == advisorapi.EmptyOwnerPoolName || p.skipPools.Has(p.configTranslator.Translate(containerEntry.OwnerPool)) {
general.Infof("skip collecting metric for pod: %s, container: %s with owner pool name: %s",
podUID, containerName, containerEntry.OwnerPool)
continue
Expand Down Expand Up @@ -372,7 +374,7 @@ func (p *CPUPressureLoadEviction) collectMetrics(_ context.Context) {
}

for _, poolEntry := range entry {
if poolEntry == nil || !poolEntry.IsPool || p.skipPools.Has(poolName) {
if poolEntry == nil || !poolEntry.IsPool || p.skipPools.Has(p.configTranslator.Translate(poolName)) {
continue
}

Expand Down Expand Up @@ -401,7 +403,7 @@ func (p *CPUPressureLoadEviction) checkSharedPressureByPoolSize(pod2Pool PodPool
}

for _, containerEntry := range entry {
if !containerEntry.IsPool || p.skipPools.Has(poolName) || entry[advisorapi.FakedContainerName] == nil {
if !containerEntry.IsPool || p.skipPools.Has(p.configTranslator.Translate(poolName)) || entry[advisorapi.FakedContainerName] == nil {
continue
}
poolSizeSum += containerEntry.PoolSize
Expand Down
8 changes: 8 additions & 0 deletions pkg/agent/sysadvisor/metacache/metacache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package metacache

import (
"fmt"
"os"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -156,6 +157,13 @@ var _ MetaCache = &MetaCacheImp{}

// NewMetaCacheImp returns the single instance of MetaCacheImp
func NewMetaCacheImp(conf *config.Configuration, emitterPool metricspool.MetricsEmitterPool, metricsReader metrictypes.MetricsReader) (*MetaCacheImp, error) {
if conf.GenericSysAdvisorConfiguration.ClearStateFileDirectory {
if err := os.RemoveAll(conf.GenericSysAdvisorConfiguration.StateFileDirectory); err != nil {
if !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to clear state file dir")
}
}
}
stateFileDir := conf.GenericSysAdvisorConfiguration.StateFileDirectory
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateFileDir)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met
return
}

if containerInfo.QoSLevel == apiconsts.PodAnnotationQoSLevelSharedCores || containerInfo.IsNumaExclusive() {
if containerInfo.QoSLevel == apiconsts.PodAnnotationQoSLevelSharedCores || containerInfo.IsDedicatedNumaExclusive() {
requestContainers = append(requestContainers, containerInfo.Clone())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/sysadvisor/plugin/metacache/metacache.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (mcp *MetaCachePlugin) periodicWork(_ context.Context) {
// For these containers do not belong to NumaExclusive, assign the actual value to CPURequest of them.
// Because CPURequest of containerInfo would be assigned as math.Ceil(Actual CPURequest).
// As for NumaExclusive containers, the "math.Ceil(Actual CPURequest)" is acceptable.
if ci.CPURequest <= 0 || !ci.IsNumaExclusive() {
if ci.CPURequest <= 0 || !ci.IsDedicatedNumaExclusive() {
ci.CPURequest = spec.Resources.Requests.Cpu().AsApproximateFloat64()
}
if ci.CPULimit <= 0 {
Expand Down
147 changes: 91 additions & 56 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/klog/v2"

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/assembler/headroomassembler"
Expand Down Expand Up @@ -425,7 +426,31 @@ func (cra *cpuResourceAdvisor) assignToRegions(ci *types.ContainerInfo) ([]regio
return nil, fmt.Errorf("container info is nil")
}

if ci.QoSLevel == consts.PodAnnotationQoSLevelSharedCores {
switch ci.QoSLevel {
case consts.PodAnnotationQoSLevelSharedCores:
return cra.assignShareContainerToRegions(ci)
case consts.PodAnnotationQoSLevelDedicatedCores:
return cra.assignDedicatedContainerToRegions(ci)
default:
return nil, nil
}
}

func (cra *cpuResourceAdvisor) assignShareContainerToRegions(ci *types.ContainerInfo) ([]region.QoSRegion, error) {
numaID := cpuadvisor.FakedNUMAID
if !cra.conf.GenericSysAdvisorConfiguration.DisableShareCoresNumaBinding && ci.IsNumaBinding() {
if ci.OwnerPoolName == "" {
return nil, fmt.Errorf("empty owner pool name, %v/%v", ci.PodUID, ci.ContainerName)
}

if len(ci.TopologyAwareAssignments) != 1 {
return nil, fmt.Errorf("invalid share pool topology aware assignments")
}

for key := range ci.TopologyAwareAssignments {
numaID = key
}
} else {
// do not assign shared container to region when ramping up because its owner pool name is empty
if ci.RampUp {
return nil, nil
Expand All @@ -435,70 +460,76 @@ func (cra *cpuResourceAdvisor) assignToRegions(ci *types.ContainerInfo) ([]regio
if !ci.RampUp && ci.OwnerPoolName == "" {
return nil, fmt.Errorf("empty owner pool name, %v/%v", ci.PodUID, ci.ContainerName)
}
}

// assign isolated container
if ci.Isolated || cra.conf.IsolationForceEnablePools.Has(ci.OriginOwnerPoolName) {
regionName := ""
if cra.conf.IsolationNonExclusivePools.Has(ci.OriginOwnerPoolName) {
// use origin owner pool name as region name, because all the container in this pool
// share only one region which is non-exclusive
regionName = ci.OriginOwnerPoolName

// if there already exists a non-exclusive isolation region for this pod, just reuse it
regions := cra.getPoolRegions(regionName)
if len(regions) > 0 {
return regions, nil
}

// if there already exists a region with same name as this region, just reuse it
regions = cra.getRegionsByRegionNames(sets.NewString(regionName))
if len(regions) > 0 {
return regions, nil
}
} else {
// if there already exists an isolation region for this pod, just reuse it
regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeIsolation)
if err != nil {
return nil, err
} else if len(regions) > 0 {
return regions, nil
}
// assign isolated container
if ci.Isolated || cra.conf.IsolationForceEnablePools.Has(ci.OriginOwnerPoolName) {
regionName := ""
if cra.conf.IsolationNonExclusivePools.Has(ci.OriginOwnerPoolName) {
// use origin owner pool name as region name, because all the container in this pool
// share only one region which is non-exclusive
regionName = ci.OriginOwnerPoolName

// if there already exists a non-exclusive isolation region for this pod, just reuse it
regions := cra.getPoolRegions(regionName)
if len(regions) > 0 {
return regions, nil
}

r := region.NewQoSRegionIsolation(ci, regionName, cra.conf, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter)
klog.Infof("create a new isolation region (%s/%s) for container %s/%s", r.OwnerPoolName(), r.Name(), ci.PodUID, ci.ContainerName)
return []region.QoSRegion{r}, nil
}

// assign shared cores container. focus on pool.
regions := cra.getPoolRegions(ci.OriginOwnerPoolName)
if len(regions) > 0 {
return regions, nil
// if there already exists a region with same name as this region, just reuse it
regions = cra.getRegionsByRegionNames(sets.NewString(regionName))
if len(regions) > 0 {
return regions, nil
}
} else {
// if there already exists an isolation region for this pod, just reuse it
regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeIsolation)
if err != nil {
return nil, err
} else if len(regions) > 0 {
return regions, nil
}
}

// create one region by owner pool name
r := region.NewQoSRegionShare(ci, cra.conf, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter)
klog.Infof("create a new share region (%s/%s) for container %s/%s", r.OwnerPoolName(), r.Name(), ci.PodUID, ci.ContainerName)
r := region.NewQoSRegionIsolation(ci, regionName, cra.conf, cra.extraConf, numaID, cra.metaCache, cra.metaServer, cra.emitter)
klog.Infof("create a new isolation region (%s/%s) for container %s/%s", r.OwnerPoolName(), r.Name(), ci.PodUID, ci.ContainerName)
return []region.QoSRegion{r}, nil
}

} else if ci.IsNumaBinding() {
// assign dedicated cores numa exclusive containers. focus on container.
regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeDedicatedNumaExclusive)
if err != nil {
return nil, err
} else if len(regions) > 0 {
return regions, nil
}
// assign shared cores container. focus on pool.
// Why OriginOwnerPoolName ?
// Case 1: a new container
// OriginOwnerPoolName == OwnerPoolName
// Case 2: put the isolation container back to share pool
// OriginOwnerPoolName != OwnerPoolName:
// Case others:
// OriginOwnerPoolName == OwnerPoolName
regions := cra.getPoolRegions(ci.OriginOwnerPoolName)
if len(regions) > 0 {
return regions, nil
}

// create regions by numa node
for numaID := range ci.TopologyAwareAssignments {
r := region.NewQoSRegionDedicatedNumaExclusive(ci, cra.conf, numaID, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter)
regions = append(regions, r)
}
// create one region by owner pool name
r := region.NewQoSRegionShare(ci, cra.conf, cra.extraConf, numaID, cra.metaCache, cra.metaServer, cra.emitter)
klog.Infof("create a new share region (%s/%s) for container %s/%s", r.OwnerPoolName(), r.Name(), ci.PodUID, ci.ContainerName)
return []region.QoSRegion{r}, nil
}

func (cra *cpuResourceAdvisor) assignDedicatedContainerToRegions(ci *types.ContainerInfo) ([]region.QoSRegion, error) {
// assign dedicated cores numa exclusive containers. focus on container.
regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeDedicatedNumaExclusive)
if err != nil {
return nil, err
} else if len(regions) > 0 {
return regions, nil
}

return nil, nil
// create regions by numa node
for numaID := range ci.TopologyAwareAssignments {
r := region.NewQoSRegionDedicatedNumaExclusive(ci, cra.conf, numaID, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter)
regions = append(regions, r)
}
return regions, nil
}

// gcRegionMap deletes empty regions in region map
Expand All @@ -520,7 +551,11 @@ func (cra *cpuResourceAdvisor) updateAdvisorEssentials() {

// update non-binding numas
for _, r := range cra.regionMap {
if r.Type() == types.QoSRegionTypeDedicatedNumaExclusive {
if !r.IsNumaBinding() {
continue
}
// ignore isolation region
if r.Type() == types.QoSRegionTypeDedicatedNumaExclusive || r.Type() == types.QoSRegionTypeShare {
cra.nonBindingNumas = cra.nonBindingNumas.Difference(r.GetBindingNumas())
}
}
Expand All @@ -532,7 +567,7 @@ func (cra *cpuResourceAdvisor) updateAdvisorEssentials() {

for _, r := range cra.regionMap {
// set binding numas for non numa binding regions
if r.Type() == types.QoSRegionTypeShare {
if !r.IsNumaBinding() && r.Type() == types.QoSRegionTypeShare {
r.SetBindingNumas(cra.nonBindingNumas)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1149,20 +1149,20 @@ func TestGetIsolatedContainerRegions(t *testing.T) {

r1 := &region.QoSRegionShare{
QoSRegionBase: region.NewQoSRegionBase("r1", "", types.QoSRegionTypeIsolation,
conf, struct{}{}, nil, nil, nil),
conf, struct{}{}, false, nil, nil, nil),
}
_ = r1.AddContainer(c1_1)
_ = r1.AddContainer(c1_2)

r2 := &region.QoSRegionShare{
QoSRegionBase: region.NewQoSRegionBase("r2", "", types.QoSRegionTypeShare,
conf, struct{}{}, nil, nil, nil),
conf, struct{}{}, false, nil, nil, nil),
}
_ = r2.AddContainer(c2)

r3 := &region.QoSRegionShare{
QoSRegionBase: region.NewQoSRegionBase("r3", "", types.QoSRegionTypeDedicatedNumaExclusive,
conf, struct{}{}, nil, nil, nil),
conf, struct{}{}, false, nil, nil, nil),
}
_ = r3.AddContainer(c3_1)
_ = r3.AddContainer(c3_2)
Expand Down
Loading

0 comments on commit 004c298

Please sign in to comment.