Skip to content

Commit

Permalink
Merge pull request #549 from cheney-lin/dev/tmo
Browse files Browse the repository at this point in the history
feat: Add TransparentMemoryOffloading plugin to trigger memory offloading
  • Loading branch information
nightmeng committed Apr 30, 2024
2 parents f39c3e0 + 7a1e3f1 commit 8475c45
Show file tree
Hide file tree
Showing 30 changed files with 1,543 additions and 36 deletions.
7 changes: 6 additions & 1 deletion cmd/katalyst-agent/app/options/dynamic/dynamic_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,30 @@ import (
cliflag "k8s.io/component-base/cli/flag"

"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/adminqos"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/tmo"
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
)

type DynamicOptions struct {
*adminqos.AdminQoSOptions
*tmo.TransparentMemoryOffloadingOptions
}

func NewDynamicOptions() *DynamicOptions {
return &DynamicOptions{
AdminQoSOptions: adminqos.NewAdminQoSOptions(),
AdminQoSOptions: adminqos.NewAdminQoSOptions(),
TransparentMemoryOffloadingOptions: tmo.NewTransparentMemoryOffloadingOptions(),
}
}

func (o *DynamicOptions) AddFlags(fss *cliflag.NamedFlagSets) {
o.AdminQoSOptions.AddFlags(fss)
o.TransparentMemoryOffloadingOptions.AddFlags(fss)
}

func (o *DynamicOptions) ApplyTo(c *dynamic.Configuration) error {
var errList []error
errList = append(errList, o.AdminQoSOptions.ApplyTo(c.AdminQoSConfiguration))
errList = append(errList, o.TransparentMemoryOffloadingOptions.ApplyTo(c.TransparentMemoryOffloadingConfiguration))
return errors.NewAggregate(errList)
}
45 changes: 45 additions & 0 deletions cmd/katalyst-agent/app/options/dynamic/tmo/tmo_base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tmo

import (
"k8s.io/apimachinery/pkg/util/errors"
cliflag "k8s.io/component-base/cli/flag"

"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/tmo/tmodefault"
tmodynamicconf "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo"
)

type TransparentMemoryOffloadingOptions struct {
*tmodefault.DefaultOptions
}

func NewTransparentMemoryOffloadingOptions() *TransparentMemoryOffloadingOptions {
return &TransparentMemoryOffloadingOptions{
DefaultOptions: tmodefault.NewDefaultOptions(),
}
}

func (o *TransparentMemoryOffloadingOptions) AddFlags(fss *cliflag.NamedFlagSets) {
o.DefaultOptions.AddFlags(fss)
}

func (o *TransparentMemoryOffloadingOptions) ApplyTo(c *tmodynamicconf.TransparentMemoryOffloadingConfiguration) error {
var errList []error
errList = append(errList, o.DefaultOptions.ApplyTo(c.DefaultConfigurations))
return errors.NewAggregate(errList)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tmodefault

import (
"time"

cliflag "k8s.io/component-base/cli/flag"

"github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1"
tmodynamicconf "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo"
)

type DefaultOptions struct {
DefaultEnableTMO bool
DefaultEnableSwap bool
DefaultTMOInterval time.Duration
DefaultTMOPolicyName string
DefaultTMOMaxProbe float64
DefaultTMOPSIPolicyPSIAvg60Threshold float64
DefaultTMORefaultPolicyReclaimAccuracyTarget float64
DefaultTMORefaultPolicyReclaimScanEfficiencyTarget float64
}

func NewDefaultOptions() *DefaultOptions {
return &DefaultOptions{
DefaultEnableTMO: tmodynamicconf.DefaultEnableTMO,
DefaultEnableSwap: tmodynamicconf.DefaultEnableSwap,
DefaultTMOInterval: tmodynamicconf.DefaultTMOInterval,
DefaultTMOPolicyName: string(tmodynamicconf.DefaultTMOPolicyName),
DefaultTMOMaxProbe: tmodynamicconf.DefaultTMOMaxProbe,
DefaultTMOPSIPolicyPSIAvg60Threshold: tmodynamicconf.DefaultTMOPSIPolicyPSIAvg60Threshold,
DefaultTMORefaultPolicyReclaimAccuracyTarget: tmodynamicconf.DefaultTMORefaultPolicyReclaimAccuracyTarget,
DefaultTMORefaultPolicyReclaimScanEfficiencyTarget: tmodynamicconf.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget,
}
}

func (o *DefaultOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs := fss.FlagSet("tmo-default-config")

fs.BoolVar(&o.DefaultEnableTMO, "default-enable-tmo", o.DefaultEnableTMO,
"whether to enable transparent memory offloading by default")
fs.BoolVar(&o.DefaultEnableSwap, "default-enable-swap", o.DefaultEnableSwap,
"whether to enable swap (offload rss) in TMO by default")
fs.DurationVar(&o.DefaultTMOInterval, "default-tmo-min-interval", o.DefaultTMOInterval,
"default minimum interval to trigger TMO on each container or cgroup")
fs.StringVar(&o.DefaultTMOPolicyName, "default-tmo-policy-name", o.DefaultTMOPolicyName,
"default policy used to calculate memory offloading size")
fs.Float64Var(&o.DefaultTMOMaxProbe, "default-max-probe", o.DefaultTMOMaxProbe,
"default maximum ratio of memory usage could be offloaded in one cycle")
fs.Float64Var(&o.DefaultTMOPSIPolicyPSIAvg60Threshold, "default-psi-policy-psi-avg60-threshold", o.DefaultTMOPSIPolicyPSIAvg60Threshold,
"indicates the default threshold of memory pressure. If observed pressure exceeds this threshold, memory offloading will be paused.")
fs.Float64Var(&o.DefaultTMORefaultPolicyReclaimAccuracyTarget, "default-refault-policy-reclaim-accuracy-target", o.DefaultTMORefaultPolicyReclaimAccuracyTarget,
"indicates the default desired level of precision or accuracy in offloaded pages")
fs.Float64Var(&o.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget, "default-refault-policy-reclaim-scan-efficiency-target", o.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget,
"indicates the default desired level of efficiency in scanning and identifying memory pages that can be offloaded.")
}

func (o *DefaultOptions) ApplyTo(c *tmodynamicconf.TMODefaultConfigurations) error {
c.DefaultEnableTMO = o.DefaultEnableTMO
c.DefaultEnableSwap = o.DefaultEnableSwap
c.DefaultTMOInterval = o.DefaultTMOInterval
c.DefaultTMOPolicyName = v1alpha1.TMOPolicyName(o.DefaultTMOPolicyName)
c.DefaultTMOMaxProbe = o.DefaultTMOMaxProbe
c.DefaultTMOPSIPolicyPSIAvg60Threshold = o.DefaultTMOPSIPolicyPSIAvg60Threshold
c.DefaultTMORefaultPolicyReclaimAccuracyTarget = o.DefaultTMORefaultPolicyReclaimAccuracyTarget
c.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget = o.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget
return nil
}
4 changes: 4 additions & 0 deletions cmd/katalyst-agent/app/options/global/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type BaseOptions struct {
AdditionalCgroupPaths []string

ReclaimRelativeRootCgroupPath string
GeneralRelativeCgroupPaths []string

// configurations for kubelet
KubeletReadOnlyPort int
Expand Down Expand Up @@ -113,6 +114,8 @@ func (o *BaseOptions) AddFlags(fss *cliflag.NamedFlagSets) {

fs.StringVar(&o.ReclaimRelativeRootCgroupPath, "reclaim-relative-root-cgroup-path", o.ReclaimRelativeRootCgroupPath,
"top level cgroup path for reclaimed_cores qos level")
fs.StringSliceVar(&o.GeneralRelativeCgroupPaths, "general-relative-cgroup-paths", o.GeneralRelativeCgroupPaths,
"The cgroup paths of standalone services which not managed by kubernetes")

fs.IntVar(&o.KubeletReadOnlyPort, "kubelet-read-only-port", o.KubeletReadOnlyPort,
"The read-only port for the kubelet to serve")
Expand Down Expand Up @@ -152,6 +155,7 @@ func (o *BaseOptions) ApplyTo(c *global.BaseConfiguration) error {
c.LockWaitingEnabled = o.LockWaitingEnabled

c.ReclaimRelativeRootCgroupPath = o.ReclaimRelativeRootCgroupPath
c.GeneralRelativeCgroupPaths = o.GeneralRelativeCgroupPaths

c.NetMultipleNS = o.MachineNetMultipleNS
c.NetNSDirAbsPath = o.MachineNetNSDirAbsPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ const (
ControlKnobKeyCPUSetMems MemoryControlKnobName = "cpuset_mems"
ControlKnobReclaimedMemorySize MemoryControlKnobName = "reclaimed_memory_size"
ControlKnobKeyBalanceNumaMemory MemoryControlKnobName = "balance_numa_memory"
ControlKnobKeySwapMax MemoryControlKnobName = "swap_max"
ControlKnowKeyMemoryOffloading MemoryControlKnobName = "memory_offloading"
)
11 changes: 7 additions & 4 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ import (
const (
MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic)

memoryPluginStateFileName = "memory_plugin_state"
memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers"
memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache"
memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page"
memoryPluginStateFileName = "memory_plugin_state"
memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers"
memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache"
memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page"
memoryPluginAsyncWorkTopicMemoryOffloading = "qrm_memory_plugin_mem_offload"

dropCacheTimeoutSeconds = 30
)
Expand Down Expand Up @@ -238,6 +239,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryProvisions))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyBalanceNumaMemory,
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleNumaMemoryBalance))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnowKeyMemoryOffloading,
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryOffloading))

return true, &agent.PluginWrapper{GenericPlugin: pluginWrapper}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/asyncworker"
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
cgroupcommon "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
"github.com/kubewharf/katalyst-core/pkg/util/general"
Expand Down Expand Up @@ -682,6 +684,70 @@ func (p *DynamicPolicy) doNumaMemoryBalance(ctx context.Context, advice types.Nu

_ = p.emitter.StoreInt64(util.MetricNameMemoryNumaBalanceResult, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "success", Val: strconv.FormatBool(migrateSuccess)})
return nil
}

// handleAdvisorMemoryOffloading handles memory offloading from memory-advisor
func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
emitter metrics.MetricEmitter,
metaServer *metaserver.MetaServer,
entryName, subEntryName string,
calculationInfo *advisorsvc.CalculationInfo, podResourceEntries state.PodResourceEntries) error {

var absCGPath string
var memoryOffloadingWorkName string
memoryOffloadingSizeInBytes := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnowKeyMemoryOffloading)]
memoryOffloadingSizeInBytesInt64, err := strconv.ParseInt(memoryOffloadingSizeInBytes, 10, 64)
if err != nil {
return fmt.Errorf("parse %s: %s failed with error: %v", memoryadvisor.ControlKnowKeyMemoryOffloading, memoryOffloadingSizeInBytes, err)
}

if calculationInfo.CgroupPath == "" {
memoryOffloadingWorkName = util.GetContainerAsyncWorkName(entryName, subEntryName, memoryPluginAsyncWorkTopicMemoryOffloading)
containerID, err := metaServer.GetContainerID(entryName, subEntryName)
if err != nil {
return fmt.Errorf("GetContainerID failed with error: %v", err)
}
absCGPath, err = common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, entryName, containerID)
if err != nil {
return fmt.Errorf("GetContainerAbsCgroupPath failed with error: %v", err)
}
} else {
memoryOffloadingWorkName = util.GetCgroupAsyncWorkName(calculationInfo.CgroupPath, memoryPluginAsyncWorkTopicMemoryOffloading)
absCGPath = common.GetAbsCgroupPath(common.CgroupSubsysMemory, calculationInfo.CgroupPath)
}

// set swap max before trigger memory offloading
swapMax := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnobKeySwapMax)]
if swapMax == consts.ControlKnobON {
err := cgroupmgr.SetSwapMaxWithAbsolutePathRecursive(absCGPath)
if err != nil {
general.Infof("Failed to set swap max, err: %v", err)
}
} else {
err := cgroupmgr.DisableSwapMaxWithAbsolutePathRecursive(absCGPath)
if err != nil {
general.Infof("Failed to disable swap, err: %v", err)
}
}

// start a asynchronous work to execute memory offloading
err = p.asyncWorkers.AddWork(memoryOffloadingWorkName,
&asyncworker.Work{
Fn: cgroupmgr.MemoryOffloadingWithAbsolutePath,
Params: []interface{}{absCGPath, memoryOffloadingSizeInBytesInt64},
DeliveredAt: time.Now()}, asyncworker.DuplicateWorkPolicyOverride)
if err != nil {
return fmt.Errorf("add work: %s pod: %s container: %s cgroup: %s failed with error: %v", memoryOffloadingWorkName, entryName, subEntryName, absCGPath, err)
}

_ = emitter.StoreInt64(util.MetricNameMemoryHandlerAdvisorMemoryOffload, memoryOffloadingSizeInBytesInt64,
metrics.MetricTypeNameRaw, metrics.ConvertMapToTags(map[string]string{
"entryName": entryName,
"subEntryName": subEntryName,
"cgroupPath": calculationInfo.CgroupPath,
})...)
return nil
}
18 changes: 17 additions & 1 deletion pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/config/agent/global"
qrmconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
coreconsts "github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent"
metaserveragent "github.com/kubewharf/katalyst-core/pkg/metaserver/agent"
Expand Down Expand Up @@ -1706,6 +1707,7 @@ func TestHandleAdvisorResp(t *testing.T) {
pod1UID := string(uuid.NewUUID())
pod2UID := string(uuid.NewUUID())
pod3UID := string(uuid.NewUUID())
pod4UID := string(uuid.NewUUID())
testName := "test"

testCases := []struct {
Expand All @@ -1716,7 +1718,7 @@ func TestHandleAdvisorResp(t *testing.T) {
lwResp *advisorsvc.ListAndWatchResponse
}{
{
description: "one shared_cores container, one reclaimed_cores container, one dedicated_cores container",
description: "one shared_cores container, two reclaimed_cores container, one dedicated_cores container",
podResourceEntries: state.PodResourceEntries{
v1.ResourceMemory: state.PodEntries{
pod1UID: state.ContainerEntries{
Expand Down Expand Up @@ -1819,6 +1821,18 @@ func TestHandleAdvisorResp(t *testing.T) {
},
},
},
pod4UID: {
ContainerEntries: map[string]*advisorsvc.CalculationInfo{
testName: {
CalculationResult: &advisorsvc.CalculationResult{
Values: map[string]string{
string(memoryadvisor.ControlKnobKeySwapMax): coreconsts.ControlKnobON,
string(memoryadvisor.ControlKnowKeyMemoryOffloading): "40960",
},
},
},
},
},
},
},
expectedPodResourceEntries: state.PodResourceEntries{
Expand Down Expand Up @@ -2120,6 +2134,8 @@ func TestHandleAdvisorResp(t *testing.T) {
memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorCPUSetMems))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache,
memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorDropCache))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnowKeyMemoryOffloading,
memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorMemoryOffloading))

machineState, err := state.GenerateMachineStateFromPodEntries(machineInfo, tc.podResourceEntries, resourcesReservedMemory)
as.Nil(err)
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/qrm-plugins/util/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
MetricNameMemoryHandleAdvisorMemoryLimit = "memory_handle_advisor_memory_limit"
MetricNameMemoryHandleAdvisorDropCache = "memory_handle_advisor_drop_cache"
MetricNameMemoryHandleAdvisorCPUSetMems = "memory_handle_advisor_cpuset_mems"
MetricNameMemoryHandlerAdvisorMemoryOffload = "memory_handler_advisor_memory_offloading"
MetricNameMemoryOOMPriorityDeleteFailed = "memory_oom_priority_delete_failed"
MetricNameMemoryOOMPriorityUpdateFailed = "memory_oom_priority_update_failed"
MetricNameMemoryNumaBalance = "memory_handle_numa_balance"
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/qrm-plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func GetContainerAsyncWorkName(podUID, containerName, topic string) string {
return strings.Join([]string{podUID, containerName, topic}, asyncworker.WorkNameSeperator)
}

func GetCgroupAsyncWorkName(cgroup, topic string) string {
return strings.Join([]string{cgroup, topic}, asyncworker.WorkNameSeperator)
}

func GetKubeletReservedQuantity(resourceName string, klConfig *kubeletconfigv1beta1.KubeletConfiguration) (resource.Quantity, bool, error) {
if klConfig == nil {
return resource.MustParse("0"), false, fmt.Errorf("nil klConfig")
Expand Down
8 changes: 7 additions & 1 deletion pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,18 @@ func NewQoSAwarePlugin(pluginName string, conf *config.Configuration, extraConf
}
}

// add dynamic config watcher
// add AdminQos dynamic config watcher
err = metaServer.ConfigurationManager.AddConfigWatcher(crd.AdminQoSConfigurationGVR)
if err != nil {
return nil, err
}

// add TransparentMemoryOffloading dynamic config watcher
err = metaServer.ConfigurationManager.AddConfigWatcher(crd.TransparentMemoryOffloadingConfigurationGVR)
if err != nil {
return nil, err
}

qap := &QoSAwarePlugin{
name: pluginName,
period: conf.SysAdvisorPluginsConfiguration.QoSAwarePluginConfiguration.SyncPeriod,
Expand Down
Loading

0 comments on commit 8475c45

Please sign in to comment.