Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support share cores with numa binding #554

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cmd/katalyst-agent/app/options/sysadvisor/sysadvisor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ import (

// GenericSysAdvisorOptions holds the configurations for sysadvisor
type GenericSysAdvisorOptions struct {
SysAdvisorPlugins []string
StateFileDirectory string
SysAdvisorPlugins []string
StateFileDirectory string
ClearStateFileDirectory bool
EnableShareCoresNumaBinding bool
SkipStateCorruption bool
}

// NewGenericSysAdvisorOptions creates a new Options with a default config.
Expand All @@ -46,7 +49,10 @@ func NewGenericSysAdvisorOptions() *GenericSysAdvisorOptions {
types.AdvisorPluginNameMetricEmitter,
types.AdvisorPluginNameInference,
},
StateFileDirectory: "/var/lib/katalyst/sys_advisor/",
StateFileDirectory: "/var/lib/katalyst/sys_advisor/",
ClearStateFileDirectory: false,
EnableShareCoresNumaBinding: true,
SkipStateCorruption: false,
}
}

Expand All @@ -58,12 +64,18 @@ func (o *GenericSysAdvisorOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"A list of sysadvisor plugins to enable. '*' enables all on-by-default sysadvisor plugins, 'foo' enables the sysadvisor plugin "+
"named 'foo', '-foo' disables the sysadvisor plugin named 'foo'"))
fs.StringVar(&o.StateFileDirectory, "state-dir", o.StateFileDirectory, "directory for sys advisor to store state file")
fs.BoolVar(&o.ClearStateFileDirectory, "clear-state-dir", o.ClearStateFileDirectory, "clear state file when starting up (only for rollback)")
fs.BoolVar(&o.EnableShareCoresNumaBinding, "enable-share-cores-numa-binding", o.EnableShareCoresNumaBinding, "enable share cores with NUMA binding feature")
fs.BoolVar(&o.SkipStateCorruption, "skip-state-corruption", o.SkipStateCorruption, "skip meta cache state corruption")
}

// ApplyTo fills up config with options
func (o *GenericSysAdvisorOptions) ApplyTo(c *sysadvisor.GenericSysAdvisorConfiguration) error {
c.SysAdvisorPlugins = o.SysAdvisorPlugins
c.StateFileDirectory = o.StateFileDirectory
c.ClearStateFileDirectory = o.ClearStateFileDirectory
c.EnableShareCoresNumaBinding = o.EnableShareCoresNumaBinding
c.SkipStateCorruption = o.SkipStateCorruption
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,22 @@ type CPUPressureLoadEviction struct {
poolMetricCollectHandlers map[string]PoolMetricCollectHandler

systemReservedCPUs machine.CPUSet
configTranslator *general.CommonSuffixTranslator
}

func NewCPUPressureLoadEviction(emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer,
conf *config.Configuration, state state.ReadonlyState,
conf *config.Configuration, readonlyState state.ReadonlyState,
) (CPUPressureEviction, error) {
plugin := &CPUPressureLoadEviction{
state: state,
emitter: emitter,
metaServer: metaServer,
metricsHistory: make(map[string]Entries),
qosConf: conf.QoSConfiguration,
dynamicConf: conf.DynamicAgentConfiguration,
skipPools: sets.NewString(conf.LoadPressureEvictionSkipPools...),
syncPeriod: conf.LoadEvictionSyncPeriod,
state: readonlyState,
emitter: emitter,
metaServer: metaServer,
metricsHistory: make(map[string]Entries),
qosConf: conf.QoSConfiguration,
dynamicConf: conf.DynamicAgentConfiguration,
skipPools: sets.NewString(conf.LoadPressureEvictionSkipPools...),
syncPeriod: conf.LoadEvictionSyncPeriod,
configTranslator: general.NewCommonSuffixTranslator(state.NUMAPoolInfix),
}

systemReservedCores, reserveErr := cpuutil.GetCoresReservedForSystem(conf, metaServer, metaServer.KatalystMachineInfo, metaServer.CPUDetails.CPUs().Clone())
Expand Down Expand Up @@ -150,7 +152,7 @@ func (p *CPUPressureLoadEviction) ThresholdMet(_ context.Context,

var softThresholdMetPoolName string
for poolName, entries := range p.metricsHistory[consts.MetricLoad1MinContainer] {
if !entries.IsPoolEntry() || p.skipPools.Has(poolName) || state.IsIsolationPool(poolName) {
if !entries.IsPoolEntry() || p.skipPools.Has(p.configTranslator.Translate(poolName)) || state.IsIsolationPool(poolName) {
continue
}

Expand Down Expand Up @@ -333,7 +335,7 @@ func (p *CPUPressureLoadEviction) collectMetrics(_ context.Context) {
for containerName, containerEntry := range entry {
if containerEntry == nil || containerEntry.IsPool {
continue
} else if containerEntry.OwnerPool == state.EmptyOwnerPoolName || p.skipPools.Has(containerEntry.OwnerPool) {
} else if containerEntry.OwnerPool == state.EmptyOwnerPoolName || p.skipPools.Has(p.configTranslator.Translate(containerEntry.OwnerPool)) {
general.Infof("skip collecting metric for pod: %s, container: %s with owner pool name: %s",
podUID, containerName, containerEntry.OwnerPool)
continue
Expand Down Expand Up @@ -372,7 +374,7 @@ func (p *CPUPressureLoadEviction) collectMetrics(_ context.Context) {
}

for _, poolEntry := range entry {
if poolEntry == nil || !poolEntry.IsPool || p.skipPools.Has(poolName) {
if poolEntry == nil || !poolEntry.IsPool || p.skipPools.Has(p.configTranslator.Translate(poolName)) {
continue
}

Expand Down Expand Up @@ -401,7 +403,7 @@ func (p *CPUPressureLoadEviction) checkSharedPressureByPoolSize(pod2Pool PodPool
}

for _, containerEntry := range entry {
if !containerEntry.IsPool || p.skipPools.Has(poolName) || entry[state.FakedContainerName] == nil {
if !containerEntry.IsPool || p.skipPools.Has(p.configTranslator.Translate(poolName)) || entry[state.FakedContainerName] == nil {
continue
}
poolSizeSum += containerEntry.PoolSize
Expand Down
33 changes: 31 additions & 2 deletions pkg/agent/sysadvisor/metacache/metacache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package metacache

import (
"fmt"
"os"
"reflect"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
Expand Down Expand Up @@ -132,6 +134,8 @@ type MetaCache interface {
type MetaCacheImp struct {
metrictypes.MetricsReader

skipStateCorruption bool

podEntries types.PodEntries
podMutex sync.RWMutex

Expand All @@ -156,6 +160,13 @@ var _ MetaCache = &MetaCacheImp{}

// NewMetaCacheImp returns the single instance of MetaCacheImp
func NewMetaCacheImp(conf *config.Configuration, emitterPool metricspool.MetricsEmitterPool, metricsReader metrictypes.MetricsReader) (*MetaCacheImp, error) {
if conf.GenericSysAdvisorConfiguration.ClearStateFileDirectory {
nightmeng marked this conversation as resolved.
Show resolved Hide resolved
if err := os.RemoveAll(conf.GenericSysAdvisorConfiguration.StateFileDirectory); err != nil {
if !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to clear state file dir")
}
}
}
stateFileDir := conf.GenericSysAdvisorConfiguration.StateFileDirectory
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateFileDir)
if err != nil {
Expand All @@ -165,6 +176,7 @@ func NewMetaCacheImp(conf *config.Configuration, emitterPool metricspool.Metrics

mc := &MetaCacheImp{
MetricsReader: metricsReader,
skipStateCorruption: conf.SkipStateCorruption,
podEntries: make(types.PodEntries),
poolEntries: make(types.PoolEntries),
regionEntries: make(types.RegionEntries),
Expand Down Expand Up @@ -556,15 +568,32 @@ func (mc *MetaCacheImp) storeState() error {
func (mc *MetaCacheImp) restoreState() error {
checkpoint := NewMetaCacheCheckpoint()

foundAndSkippedStateCorruption := false
if err := mc.checkpointManager.GetCheckpoint(mc.checkpointName, checkpoint); err != nil {
klog.Infof("[metacache] checkpoint %v err %v, create it", mc.checkpointName, err)
return mc.storeState()
if err == errors.ErrCheckpointNotFound {
// create a new store state
klog.Infof("[metacache] checkpoint %v doesn't exist, create it", mc.checkpointName, err)
return mc.storeState()
} else if err == errors.ErrCorruptCheckpoint {
if !mc.skipStateCorruption {
return err
}

foundAndSkippedStateCorruption = true
} else {
return err
}
}

mc.podEntries = checkpoint.PodEntries
mc.poolEntries = checkpoint.PoolEntries
mc.regionEntries = checkpoint.RegionEntries

if foundAndSkippedStateCorruption {
klog.Infof("[metacache] checkpoint %v recovery corrupt, create it", mc.checkpointName)
return mc.storeState()
}

klog.Infof("[metacache] restore state succeeded")

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (bmrf *BorweinModelResultFetcher) FetchModelResult(ctx context.Context, met
return
}

if containerInfo.QoSLevel == apiconsts.PodAnnotationQoSLevelSharedCores || containerInfo.IsNumaExclusive() {
if containerInfo.QoSLevel == apiconsts.PodAnnotationQoSLevelSharedCores || containerInfo.IsDedicatedNumaExclusive() {
requestContainers = append(requestContainers, containerInfo.Clone())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/sysadvisor/plugin/metacache/metacache.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (mcp *MetaCachePlugin) periodicWork(_ context.Context) {
// For these containers do not belong to NumaExclusive, assign the actual value to CPURequest of them.
// Because CPURequest of containerInfo would be assigned as math.Ceil(Actual CPURequest).
// As for NumaExclusive containers, the "math.Ceil(Actual CPURequest)" is acceptable.
if ci.CPURequest <= 0 || !ci.IsNumaExclusive() {
if ci.CPURequest <= 0 || !ci.IsDedicatedNumaExclusive() {
ci.CPURequest = spec.Resources.Requests.Cpu().AsApproximateFloat64()
}
if ci.CPULimit <= 0 {
Expand Down
146 changes: 90 additions & 56 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,31 @@ func (cra *cpuResourceAdvisor) assignToRegions(ci *types.ContainerInfo) ([]regio
return nil, fmt.Errorf("container info is nil")
}

if ci.QoSLevel == consts.PodAnnotationQoSLevelSharedCores {
switch ci.QoSLevel {
case consts.PodAnnotationQoSLevelSharedCores:
return cra.assignShareContainerToRegions(ci)
case consts.PodAnnotationQoSLevelDedicatedCores:
return cra.assignDedicatedContainerToRegions(ci)
default:
return nil, nil
}
}

func (cra *cpuResourceAdvisor) assignShareContainerToRegions(ci *types.ContainerInfo) ([]region.QoSRegion, error) {
numaID := state.FakedNUMAID
if cra.conf.GenericSysAdvisorConfiguration.EnableShareCoresNumaBinding && ci.IsNumaBinding() {
if ci.OwnerPoolName == "" {
nightmeng marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("empty owner pool name, %v/%v", ci.PodUID, ci.ContainerName)
}

if len(ci.TopologyAwareAssignments) != 1 {
return nil, fmt.Errorf("invalid share pool topology aware assignments")
}

for key := range ci.TopologyAwareAssignments {
numaID = key
}
} else {
// do not assign shared container to region when ramping up because its owner pool name is empty
if ci.RampUp {
return nil, nil
Expand All @@ -439,70 +463,76 @@ func (cra *cpuResourceAdvisor) assignToRegions(ci *types.ContainerInfo) ([]regio
if !ci.RampUp && ci.OwnerPoolName == "" {
return nil, fmt.Errorf("empty owner pool name, %v/%v", ci.PodUID, ci.ContainerName)
}
}

// assign isolated container
if ci.Isolated || cra.conf.IsolationForceEnablePools.Has(ci.OriginOwnerPoolName) {
regionName := ""
if cra.conf.IsolationNonExclusivePools.Has(ci.OriginOwnerPoolName) {
// use origin owner pool name as region name, because all the container in this pool
// share only one region which is non-exclusive
regionName = ci.OriginOwnerPoolName

// if there already exists a non-exclusive isolation region for this pod, just reuse it
regions := cra.getPoolRegions(regionName)
if len(regions) > 0 {
return regions, nil
}

// if there already exists a region with same name as this region, just reuse it
regions = cra.getRegionsByRegionNames(sets.NewString(regionName))
if len(regions) > 0 {
return regions, nil
}
} else {
// if there already exists an isolation region for this pod, just reuse it
regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeIsolation)
if err != nil {
return nil, err
} else if len(regions) > 0 {
return regions, nil
}
// assign isolated container
if ci.Isolated || cra.conf.IsolationForceEnablePools.Has(ci.OriginOwnerPoolName) {
regionName := ""
if cra.conf.IsolationNonExclusivePools.Has(ci.OriginOwnerPoolName) {
// use origin owner pool name as region name, because all the container in this pool
// share only one region which is non-exclusive
regionName = ci.OriginOwnerPoolName

// if there already exists a non-exclusive isolation region for this pod, just reuse it
regions := cra.getPoolRegions(regionName)
if len(regions) > 0 {
return regions, nil
}

r := region.NewQoSRegionIsolation(ci, regionName, cra.conf, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter)
klog.Infof("create a new isolation region (%s/%s) for container %s/%s", r.OwnerPoolName(), r.Name(), ci.PodUID, ci.ContainerName)
return []region.QoSRegion{r}, nil
}

// assign shared cores container. focus on pool.
regions := cra.getPoolRegions(ci.OriginOwnerPoolName)
if len(regions) > 0 {
return regions, nil
// if there already exists a region with same name as this region, just reuse it
regions = cra.getRegionsByRegionNames(sets.NewString(regionName))
if len(regions) > 0 {
return regions, nil
}
} else {
// if there already exists an isolation region for this pod, just reuse it
regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeIsolation)
if err != nil {
return nil, err
} else if len(regions) > 0 {
return regions, nil
}
}

// create one region by owner pool name
r := region.NewQoSRegionShare(ci, cra.conf, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter)
klog.Infof("create a new share region (%s/%s) for container %s/%s", r.OwnerPoolName(), r.Name(), ci.PodUID, ci.ContainerName)
r := region.NewQoSRegionIsolation(ci, regionName, cra.conf, cra.extraConf, numaID, cra.metaCache, cra.metaServer, cra.emitter)
klog.Infof("create a new isolation region (%s/%s) for container %s/%s", r.OwnerPoolName(), r.Name(), ci.PodUID, ci.ContainerName)
return []region.QoSRegion{r}, nil
}

} else if ci.IsNumaBinding() {
// assign dedicated cores numa exclusive containers. focus on container.
regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeDedicatedNumaExclusive)
if err != nil {
return nil, err
} else if len(regions) > 0 {
return regions, nil
}
// assign shared cores container. focus on pool.
// Why OriginOwnerPoolName ?
// Case 1: a new container
// OriginOwnerPoolName == OwnerPoolName
// Case 2: put the isolation container back to share pool
// OriginOwnerPoolName != OwnerPoolName:
// Case others:
// OriginOwnerPoolName == OwnerPoolName
regions := cra.getPoolRegions(ci.OriginOwnerPoolName)
if len(regions) > 0 {
return regions, nil
}

// create regions by numa node
for numaID := range ci.TopologyAwareAssignments {
r := region.NewQoSRegionDedicatedNumaExclusive(ci, cra.conf, numaID, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter)
regions = append(regions, r)
}
// create one region by owner pool name
r := region.NewQoSRegionShare(ci, cra.conf, cra.extraConf, numaID, cra.metaCache, cra.metaServer, cra.emitter)
klog.Infof("create a new share region (%s/%s) for container %s/%s", r.OwnerPoolName(), r.Name(), ci.PodUID, ci.ContainerName)
return []region.QoSRegion{r}, nil
}

func (cra *cpuResourceAdvisor) assignDedicatedContainerToRegions(ci *types.ContainerInfo) ([]region.QoSRegion, error) {
// assign dedicated cores numa exclusive containers. focus on container.
regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeDedicatedNumaExclusive)
if err != nil {
return nil, err
} else if len(regions) > 0 {
return regions, nil
}

return nil, nil
// create regions by numa node
for numaID := range ci.TopologyAwareAssignments {
r := region.NewQoSRegionDedicatedNumaExclusive(ci, cra.conf, numaID, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter)
regions = append(regions, r)
}
return regions, nil
}

// gcRegionMap deletes empty regions in region map
Expand All @@ -524,7 +554,11 @@ func (cra *cpuResourceAdvisor) updateAdvisorEssentials() {

// update non-binding numas
for _, r := range cra.regionMap {
if r.Type() == types.QoSRegionTypeDedicatedNumaExclusive {
if !r.IsNumaBinding() {
continue
}
// ignore isolation region
if r.Type() == types.QoSRegionTypeDedicatedNumaExclusive || r.Type() == types.QoSRegionTypeShare {
cra.nonBindingNumas = cra.nonBindingNumas.Difference(r.GetBindingNumas())
}
}
Expand All @@ -536,7 +570,7 @@ func (cra *cpuResourceAdvisor) updateAdvisorEssentials() {

for _, r := range cra.regionMap {
// set binding numas for non numa binding regions
if r.Type() == types.QoSRegionTypeShare {
if !r.IsNumaBinding() && r.Type() == types.QoSRegionTypeShare {
r.SetBindingNumas(cra.nonBindingNumas)
}

Expand Down
Loading
Loading