Skip to content

Commit

Permalink
feat: Add TransparentMemoryOffloading plugin to trigger memory offloa…
Browse files Browse the repository at this point in the history
…ding periodically
  • Loading branch information
LuyaoZhong committed Feb 29, 2024
1 parent b24c422 commit 44e8004
Show file tree
Hide file tree
Showing 22 changed files with 984 additions and 20 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ require (
)

replace (
github.com/kubewharf/katalyst-api => github.com/LuyaoZhong/katalyst-api v0.0.0-20240227073944-4412ffd4572c
k8s.io/api => k8s.io/api v0.24.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6
k8s.io/apimachinery => k8s.io/apimachinery v0.24.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ github.com/GoogleCloudPlatform/k8s-cloud-provider v1.16.1-0.20210702024009-ea616
github.com/HdrHistogram/hdrhistogram-go v1.0.0/go.mod h1:YzE1EgsuAz8q9lfGdlxBZo2Ma655+PfKp2mlzcAqIFw=
github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab/go.mod h1:3VYc5hodBMJ5+l/7J4xAyMeuM2PNuepvHlGs8yilUCA=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/LuyaoZhong/katalyst-api v0.0.0-20240227073944-4412ffd4572c h1:fWaQCtNjk7YXKdGDVx6W/9wVabiCHYGt5IezUKgPj/A=
github.com/LuyaoZhong/katalyst-api v0.0.0-20240227073944-4412ffd4572c/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/MakeNowJust/heredoc v0.0.0-20170808103936-bb23615498cd/go.mod h1:64YHyfSL2R96J44Nlwm39UHepQbyR5q10x7iYa1ks2E=
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
github.com/Microsoft/go-winio v0.4.15/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw=
Expand Down Expand Up @@ -548,8 +550,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.4.1-0.20240222122824-be538f641f58 h1:D9dCR5EIR0k0Qil2A5biZjrubagRkEr7fyov6fb2ApY=
github.com/kubewharf/katalyst-api v0.4.1-0.20240222122824-be538f641f58/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ const (
ControlKnobKeyCPUSetMems MemoryControlKnobName = "cpuset_mems"
ControlKnobReclaimedMemorySize MemoryControlKnobName = "reclaimed_memory_size"
ControlKnobKeyBalanceNumaMemory MemoryControlKnobName = "balance_numa_memory"
ControlKnobKeySwapMax MemoryControlKnobName = "swap_max"
ControlKnowKeyMemoryOffloading MemoryControlKnobName = "memory_offloading"
)
11 changes: 7 additions & 4 deletions pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ import (
const (
MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic)

memoryPluginStateFileName = "memory_plugin_state"
memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers"
memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache"
memoryPluginAsyncWorkTopicMigratePage = "qrm_memory_plugin_migrate_page"
memoryPluginStateFileName = "memory_plugin_state"
memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers"
memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache"
memoryPluginAsyncWorkTopicMigratePage = "qrm_memory_plugin_migrate_page"
memoryPluginAsyncWorkTopicMemoryOffloading = "qrm_memory_plugin_mem_offload"

dropCacheTimeoutSeconds = 30
)
Expand Down Expand Up @@ -232,6 +233,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryProvisions))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyBalanceNumaMemory,
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleNumaMemoryBalance))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnowKeyMemoryOffloading,
memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryOffloading))

return true, &agent.PluginWrapper{GenericPlugin: pluginWrapper}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/asyncworker"
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
cgroupcommon "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager"
"github.com/kubewharf/katalyst-core/pkg/util/general"
Expand Down Expand Up @@ -670,6 +671,70 @@ func (p *DynamicPolicy) doNumaMemoryBalance(ctx context.Context, advice types.Nu

_ = p.emitter.StoreInt64(util.MetricNameMemoryNumaBalanceResult, 1, metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "success", Val: strconv.FormatBool(migrateSuccess)})
return nil
}

// handleAdvisorMemoryOffloading handles memory offloading from memory-advisor
func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
emitter metrics.MetricEmitter,
metaServer *metaserver.MetaServer,
entryName, subEntryName string,
calculationInfo *advisorsvc.CalculationInfo, podResourceEntries state.PodResourceEntries) error {

var absCGPath string
var memoryOffloadingWorkName string
memoryOffloadingSizeInBytes := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnowKeyMemoryOffloading)]
memoryOffloadingSizeInBytesInt64, err := strconv.ParseInt(memoryOffloadingSizeInBytes, 10, 64)
if err != nil {
return fmt.Errorf("parse %s: %s failed with error: %v", memoryadvisor.ControlKnowKeyMemoryOffloading, memoryOffloadingSizeInBytes, err)
}

if calculationInfo.CgroupPath == "" {
memoryOffloadingWorkName = util.GetContainerAsyncWorkName(entryName, subEntryName, memoryPluginAsyncWorkTopicMemoryOffloading)
containerID, err := metaServer.GetContainerID(entryName, subEntryName)
if err != nil {
return fmt.Errorf("GetContainerID failed with error: %v", err)
}
absCGPath, err = common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, entryName, containerID)
if err != nil {
return fmt.Errorf("GetContainerAbsCgroupPath failed with error: %v", err)
}
} else {
memoryOffloadingWorkName = util.GetCgroupAsyncWorkName(calculationInfo.CgroupPath, memoryPluginAsyncWorkTopicMemoryOffloading)
absCGPath = common.GetAbsCgroupPath(common.CgroupSubsysMemory, calculationInfo.CgroupPath)
}

// set swap max before trigger memory offloading
swapMax := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnobKeySwapMax)]
if swapMax == "true" {
err := cgroupmgr.SetSwapMaxWithAbsolutePathRecursive(absCGPath)
if err != nil {
general.Infof("Failed to set swap max, err: %v", err)
}
} else {
err := cgroupmgr.DisableSwapMaxWithAbsolutePathRecursive(absCGPath)
if err != nil {
general.Infof("Failed to disable swap, err: %v", err)
}
}

// start a asynchronous work to execute memory offloading
err = p.asyncWorkers.AddWork(memoryOffloadingWorkName,
&asyncworker.Work{
Fn: cgroupmgr.MemoryOffloadingWithAbsolutePath,
Params: []interface{}{absCGPath, memoryOffloadingSizeInBytesInt64},
DeliveredAt: time.Now()}, asyncworker.DuplicateWorkPolicyOverride)
if err != nil {
return fmt.Errorf("add work: %s pod: %s container: %s cgroup: %s failed with error: %v", memoryOffloadingWorkName, entryName, subEntryName, absCGPath, err)
}

_ = emitter.StoreInt64(util.MetricNameMemoryHandleAdvisorMemoryLimit, memoryOffloadingSizeInBytesInt64,
metrics.MetricTypeNameRaw, metrics.ConvertMapToTags(map[string]string{
"entryName": entryName,
"subEntryName": subEntryName,
"cgroupPath": calculationInfo.CgroupPath,
})...)
return nil
}
17 changes: 16 additions & 1 deletion pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1705,6 +1705,7 @@ func TestHandleAdvisorResp(t *testing.T) {
pod1UID := string(uuid.NewUUID())
pod2UID := string(uuid.NewUUID())
pod3UID := string(uuid.NewUUID())
pod4UID := string(uuid.NewUUID())
testName := "test"

testCases := []struct {
Expand All @@ -1715,7 +1716,7 @@ func TestHandleAdvisorResp(t *testing.T) {
lwResp *advisorsvc.ListAndWatchResponse
}{
{
description: "one shared_cores container, one reclaimed_cores container, one dedicated_cores container",
description: "one shared_cores container, two reclaimed_cores container, one dedicated_cores container",
podResourceEntries: state.PodResourceEntries{
v1.ResourceMemory: state.PodEntries{
pod1UID: state.ContainerEntries{
Expand Down Expand Up @@ -1818,6 +1819,18 @@ func TestHandleAdvisorResp(t *testing.T) {
},
},
},
pod4UID: {
ContainerEntries: map[string]*advisorsvc.CalculationInfo{
testName: {
CalculationResult: &advisorsvc.CalculationResult{
Values: map[string]string{
string(memoryadvisor.ControlKnobKeySwapMax): "true",
string(memoryadvisor.ControlKnowKeyMemoryOffloading): "40960",
},
},
},
},
},
},
},
expectedPodResourceEntries: state.PodResourceEntries{
Expand Down Expand Up @@ -2119,6 +2132,8 @@ func TestHandleAdvisorResp(t *testing.T) {
memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorCPUSetMems))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache,
memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorDropCache))
memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnowKeyMemoryOffloading,
memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorMemoryOffloading))

machineState, err := state.GenerateMachineStateFromPodEntries(machineInfo, tc.podResourceEntries, resourcesReservedMemory)
as.Nil(err)
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/qrm-plugins/util/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
MetricNameMemoryHandleAdvisorMemoryLimit = "memory_handle_advisor_memory_limit"
MetricNameMemoryHandleAdvisorDropCache = "memory_handle_advisor_drop_cache"
MetricNameMemoryHandleAdvisorCPUSetMems = "memory_handle_advisor_cpuset_mems"
MetricNameMemoryHandlerAdvisorMemoryOffload = "memory_handler_advisor_memory_offloading"
MetricNameMemoryOOMPriorityDeleteFailed = "memory_oom_priority_delete_failed"
MetricNameMemoryOOMPriorityUpdateFailed = "memory_oom_priority_update_failed"
MetricNameMemoryNumaBalance = "memory_handle_numa_balance"
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/qrm-plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func GetContainerAsyncWorkName(podUID, containerName, topic string) string {
return strings.Join([]string{podUID, containerName, topic}, asyncworker.WorkNameSeperator)
}

func GetCgroupAsyncWorkName(cgroup, topic string) string {
return strings.Join([]string{cgroup, topic}, asyncworker.WorkNameSeperator)
}

func GetKubeletReservedQuantity(resourceName string, klConfig *kubeletconfigv1beta1.KubeletConfiguration) (resource.Quantity, bool, error) {
if klConfig == nil {
return resource.MustParse("0"), false, fmt.Errorf("nil klConfig")
Expand Down
8 changes: 7 additions & 1 deletion pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,18 @@ func NewQoSAwarePlugin(pluginName string, conf *config.Configuration, extraConf
}
}

// add dynamic config watcher
// add AdminQos dynamic config watcher
err = metaServer.ConfigurationManager.AddConfigWatcher(crd.AdminQoSConfigurationGVR)
if err != nil {
return nil, err
}

// add TransparentMemoryOffloading dynamic config watcher
err = metaServer.ConfigurationManager.AddConfigWatcher(crd.TransparentMemoryOffloadingConfigurationGVR)
if err != nil {
return nil, err
}

qap := &QoSAwarePlugin{
name: pluginName,
period: conf.SysAdvisorPluginsConfiguration.QoSAwarePluginConfiguration.SyncPeriod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func init() {
memadvisorplugin.RegisterInitializer(memadvisorplugin.MemoryGuard, memadvisorplugin.NewMemoryGuard)
memadvisorplugin.RegisterInitializer(memadvisorplugin.MemsetBinder, memadvisorplugin.NewMemsetBinder)
memadvisorplugin.RegisterInitializer(memadvisorplugin.NumaMemoryBalancer, memadvisorplugin.NewMemoryBalancer)

memadvisorplugin.RegisterInitializer(memadvisorplugin.TransparentMemoryOffloading, memadvisorplugin.NewTransparentMemoryOffloading)
memadvisorplugin.RegisterInitializer(provisioner.MemoryProvisioner, provisioner.NewMemoryProvisioner)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"testing"
"time"

"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo"

info "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -732,6 +734,97 @@ func TestUpdate(t *testing.T) {
},
},
},
{
name: "memory offloading",
pools: map[string]*types.PoolInfo{
state.PoolNameReserve: {
PoolName: state.PoolNameReserve,
TopologyAwareAssignments: map[int]machine.CPUSet{
0: machine.MustParse("0"),
1: machine.MustParse("24"),
},
OriginalTopologyAwareAssignments: map[int]machine.CPUSet{
0: machine.MustParse("0"),
1: machine.MustParse("24"),
},
},
},
reclaimedEnable: true,
needRecvAdvices: true,
containers: []*types.ContainerInfo{
makeContainerInfo("uid1", "default", "pod1", "c1", consts.PodAnnotationQoSLevelReclaimedCores, nil,
map[int]machine.CPUSet{
0: machine.MustParse("1"),
1: machine.MustParse("25"),
}, 200<<30),
},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
UID: "uid1",
},
},
},
wantHeadroom: *resource.NewQuantity(996<<30, resource.DecimalSI),
nodeMetrics: defaultNodeMetrics,
numaMetrics: defaultNumaMetrics,
containerMetrics: []containerMetric{
{
metricName: coreconsts.MetricMemPsiAvg60Container,
metricValue: metricutil.MetricData{Value: 0.01},
podUID: "uid1",
containerName: "c1",
},
{
metricName: coreconsts.MetricMemUsageContainer,
metricValue: metricutil.MetricData{Value: 10 << 30},
podUID: "uid1",
containerName: "c1",
},
{
metricName: coreconsts.MetricMemInactiveAnonContainer,
metricValue: metricutil.MetricData{Value: 1 << 30},
podUID: "uid1",
containerName: "c1",
},
{
metricName: coreconsts.MetricMemInactiveFileContainer,
metricValue: metricutil.MetricData{Value: 1 << 30},
podUID: "uid1",
containerName: "c1",
},
{
metricName: coreconsts.MetricMemPgscanContainer,
metricValue: metricutil.MetricData{Value: 15000},
podUID: "uid1",
containerName: "c1",
},
{
metricName: coreconsts.MetricMemPgstealContainer,
metricValue: metricutil.MetricData{Value: 10000},
podUID: "uid1",
containerName: "c1",
},
{
metricName: coreconsts.MetricMemWorkingsetRefaultContainer,
metricValue: metricutil.MetricData{Value: 1000},
podUID: "uid1",
containerName: "c1",
},
},
plugins: []types.MemoryAdvisorPluginName{memadvisorplugin.TransparentMemoryOffloading},
wantAdviceResult: types.InternalMemoryCalculationResult{
ContainerEntries: []types.ContainerMemoryAdvices{{
PodUID: "uid1",
ContainerName: "c1",
Values: map[string]string{
string(memoryadvisor.ControlKnobKeySwapMax): "false",
string(memoryadvisor.ControlKnowKeyMemoryOffloading): "4831838"},
}},
},
},
{
name: "bind memset",
pools: map[string]*types.PoolInfo{
Expand Down Expand Up @@ -1907,6 +2000,9 @@ func TestUpdate(t *testing.T) {

advisor, metaCache := newTestMemoryAdvisor(t, tt.pods, ckDir, sfDir, fetcher, tt.plugins)
advisor.conf.GetDynamicConfiguration().EnableReclaim = tt.reclaimedEnable
transparentMemoryOffloadingConfiguration := tmo.NewTransparentMemoryOffloadingConfiguration()
transparentMemoryOffloadingConfiguration.QoSLevelConfigs[consts.QoSLevelReclaimedCores] = tmo.NewTMOConfigDetail()
advisor.conf.GetDynamicConfiguration().TransparentMemoryOffloadingConfiguration = transparentMemoryOffloadingConfiguration
_, advisorRecvChInterface := advisor.GetChannels()

recvCh := advisorRecvChInterface.(chan types.InternalMemoryCalculationResult)
Expand Down
Loading

0 comments on commit 44e8004

Please sign in to comment.