diff --git a/go.mod b/go.mod index 4d417a570e..36032308bd 100644 --- a/go.mod +++ b/go.mod @@ -147,6 +147,7 @@ require ( ) replace ( + github.com/kubewharf/katalyst-api => github.com/LuyaoZhong/katalyst-api v0.0.0-20240227073944-4412ffd4572c k8s.io/api => k8s.io/api v0.24.6 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6 k8s.io/apimachinery => k8s.io/apimachinery v0.24.6 diff --git a/go.sum b/go.sum index 251b21e046..208bbc10ac 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,8 @@ github.com/GoogleCloudPlatform/k8s-cloud-provider v1.16.1-0.20210702024009-ea616 github.com/HdrHistogram/hdrhistogram-go v1.0.0/go.mod h1:YzE1EgsuAz8q9lfGdlxBZo2Ma655+PfKp2mlzcAqIFw= github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab/go.mod h1:3VYc5hodBMJ5+l/7J4xAyMeuM2PNuepvHlGs8yilUCA= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= +github.com/LuyaoZhong/katalyst-api v0.0.0-20240227073944-4412ffd4572c h1:fWaQCtNjk7YXKdGDVx6W/9wVabiCHYGt5IezUKgPj/A= +github.com/LuyaoZhong/katalyst-api v0.0.0-20240227073944-4412ffd4572c/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k= github.com/MakeNowJust/heredoc v0.0.0-20170808103936-bb23615498cd/go.mod h1:64YHyfSL2R96J44Nlwm39UHepQbyR5q10x7iYa1ks2E= github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/Microsoft/go-winio v0.4.15/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw= @@ -548,8 +550,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubewharf/katalyst-api v0.4.1-0.20240222122824-be538f641f58 h1:D9dCR5EIR0k0Qil2A5biZjrubagRkEr7fyov6fb2ApY= -github.com/kubewharf/katalyst-api v0.4.1-0.20240222122824-be538f641f58/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k= github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4= github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c= github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8= diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/const.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/const.go index 8a782affd2..6f7b4aa842 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/const.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/const.go @@ -24,4 +24,6 @@ const ( ControlKnobKeyCPUSetMems MemoryControlKnobName = "cpuset_mems" ControlKnobReclaimedMemorySize MemoryControlKnobName = "reclaimed_memory_size" ControlKnobKeyBalanceNumaMemory MemoryControlKnobName = "balance_numa_memory" + ControlKnobKeySwapMax MemoryControlKnobName = "swap_max" + ControlKnowKeyMemoryOffloading MemoryControlKnobName = "memory_offloading" ) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index 737b0a97c1..7d3651d36a 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -57,10 +57,11 @@ import ( const ( MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic) - memoryPluginStateFileName = "memory_plugin_state" - memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers" - memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache" - memoryPluginAsyncWorkTopicMigratePage = "qrm_memory_plugin_migrate_page" + memoryPluginStateFileName = "memory_plugin_state" + memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers" + memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache" + memoryPluginAsyncWorkTopicMigratePage = "qrm_memory_plugin_migrate_page" + memoryPluginAsyncWorkTopicMemoryOffloading = "qrm_memory_plugin_mem_offload" dropCacheTimeoutSeconds = 30 ) @@ -232,6 +233,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryProvisions)) memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyBalanceNumaMemory, memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleNumaMemoryBalance)) + memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnowKeyMemoryOffloading, + memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryOffloading)) return true, &agent.PluginWrapper{GenericPlugin: pluginWrapper}, nil } diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go index 222507a4bb..1b2209e2ff 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go @@ -45,6 +45,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/asyncworker" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" cgroupcommon "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" "github.com/kubewharf/katalyst-core/pkg/util/general" @@ -670,6 +671,70 @@ func (p *DynamicPolicy) doNumaMemoryBalance(ctx context.Context, advice types.Nu _ = p.emitter.StoreInt64(util.MetricNameMemoryNumaBalanceResult, 1, metrics.MetricTypeNameRaw, metrics.MetricTag{Key: "success", Val: strconv.FormatBool(migrateSuccess)}) + return nil +} + +// handleAdvisorMemoryOffloading handles memory offloading from memory-advisor +func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration, + _ interface{}, + _ *dynamicconfig.DynamicAgentConfiguration, + emitter metrics.MetricEmitter, + metaServer *metaserver.MetaServer, + entryName, subEntryName string, + calculationInfo *advisorsvc.CalculationInfo, podResourceEntries state.PodResourceEntries) error { + var absCGPath string + var memoryOffloadingWorkName string + memoryOffloadingSizeInBytes := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnowKeyMemoryOffloading)] + memoryOffloadingSizeInBytesInt64, err := strconv.ParseInt(memoryOffloadingSizeInBytes, 10, 64) + if err != nil { + return fmt.Errorf("parse %s: %s failed with error: %v", memoryadvisor.ControlKnowKeyMemoryOffloading, memoryOffloadingSizeInBytes, err) + } + + if calculationInfo.CgroupPath == "" { + memoryOffloadingWorkName = util.GetContainerAsyncWorkName(entryName, subEntryName, memoryPluginAsyncWorkTopicMemoryOffloading) + containerID, err := metaServer.GetContainerID(entryName, subEntryName) + if err != nil { + return fmt.Errorf("GetContainerID failed with error: %v", err) + } + absCGPath, err = common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, entryName, containerID) + if err != nil { + return fmt.Errorf("GetContainerAbsCgroupPath failed with error: %v", err) + } + } else { + memoryOffloadingWorkName = util.GetCgroupAsyncWorkName(calculationInfo.CgroupPath, memoryPluginAsyncWorkTopicMemoryOffloading) + absCGPath = common.GetAbsCgroupPath(common.CgroupSubsysMemory, calculationInfo.CgroupPath) + } + + // set swap max before trigger memory offloading + swapMax := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnobKeySwapMax)] + if swapMax == "true" { + err := cgroupmgr.SetSwapMaxWithAbsolutePathRecursive(absCGPath) + if err != nil { + general.Infof("Failed to set swap max, err: %v", err) + } + } else { + err := cgroupmgr.DisableSwapMaxWithAbsolutePathRecursive(absCGPath) + if err != nil { + general.Infof("Failed to disable swap, err: %v", err) + } + } + + // start a asynchronous work to execute memory offloading + err = p.asyncWorkers.AddWork(memoryOffloadingWorkName, + &asyncworker.Work{ + Fn: cgroupmgr.MemoryOffloadingWithAbsolutePath, + Params: []interface{}{absCGPath, memoryOffloadingSizeInBytesInt64}, + DeliveredAt: time.Now()}, asyncworker.DuplicateWorkPolicyOverride) + if err != nil { + return fmt.Errorf("add work: %s pod: %s container: %s cgroup: %s failed with error: %v", memoryOffloadingWorkName, entryName, subEntryName, absCGPath, err) + } + + _ = emitter.StoreInt64(util.MetricNameMemoryHandleAdvisorMemoryLimit, memoryOffloadingSizeInBytesInt64, + metrics.MetricTypeNameRaw, metrics.ConvertMapToTags(map[string]string{ + "entryName": entryName, + "subEntryName": subEntryName, + "cgroupPath": calculationInfo.CgroupPath, + })...) return nil } diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index f563a8cc29..5566108b1d 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -1705,6 +1705,7 @@ func TestHandleAdvisorResp(t *testing.T) { pod1UID := string(uuid.NewUUID()) pod2UID := string(uuid.NewUUID()) pod3UID := string(uuid.NewUUID()) + pod4UID := string(uuid.NewUUID()) testName := "test" testCases := []struct { @@ -1715,7 +1716,7 @@ func TestHandleAdvisorResp(t *testing.T) { lwResp *advisorsvc.ListAndWatchResponse }{ { - description: "one shared_cores container, one reclaimed_cores container, one dedicated_cores container", + description: "one shared_cores container, two reclaimed_cores container, one dedicated_cores container", podResourceEntries: state.PodResourceEntries{ v1.ResourceMemory: state.PodEntries{ pod1UID: state.ContainerEntries{ @@ -1818,6 +1819,18 @@ func TestHandleAdvisorResp(t *testing.T) { }, }, }, + pod4UID: { + ContainerEntries: map[string]*advisorsvc.CalculationInfo{ + testName: { + CalculationResult: &advisorsvc.CalculationResult{ + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): "true", + string(memoryadvisor.ControlKnowKeyMemoryOffloading): "40960", + }, + }, + }, + }, + }, }, }, expectedPodResourceEntries: state.PodResourceEntries{ @@ -2119,6 +2132,8 @@ func TestHandleAdvisorResp(t *testing.T) { memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorCPUSetMems)) memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache, memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorDropCache)) + memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnowKeyMemoryOffloading, + memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorMemoryOffloading)) machineState, err := state.GenerateMachineStateFromPodEntries(machineInfo, tc.podResourceEntries, resourcesReservedMemory) as.Nil(err) diff --git a/pkg/agent/qrm-plugins/util/consts.go b/pkg/agent/qrm-plugins/util/consts.go index c8722dcdf9..14cb910c6d 100644 --- a/pkg/agent/qrm-plugins/util/consts.go +++ b/pkg/agent/qrm-plugins/util/consts.go @@ -42,6 +42,7 @@ const ( MetricNameMemoryHandleAdvisorMemoryLimit = "memory_handle_advisor_memory_limit" MetricNameMemoryHandleAdvisorDropCache = "memory_handle_advisor_drop_cache" MetricNameMemoryHandleAdvisorCPUSetMems = "memory_handle_advisor_cpuset_mems" + MetricNameMemoryHandlerAdvisorMemoryOffload = "memory_handler_advisor_memory_offloading" MetricNameMemoryOOMPriorityDeleteFailed = "memory_oom_priority_delete_failed" MetricNameMemoryOOMPriorityUpdateFailed = "memory_oom_priority_update_failed" MetricNameMemoryNumaBalance = "memory_handle_numa_balance" diff --git a/pkg/agent/qrm-plugins/util/util.go b/pkg/agent/qrm-plugins/util/util.go index f1ecf53289..5a46bf4578 100644 --- a/pkg/agent/qrm-plugins/util/util.go +++ b/pkg/agent/qrm-plugins/util/util.go @@ -315,6 +315,10 @@ func GetContainerAsyncWorkName(podUID, containerName, topic string) string { return strings.Join([]string{podUID, containerName, topic}, asyncworker.WorkNameSeperator) } +func GetCgroupAsyncWorkName(cgroup, topic string) string { + return strings.Join([]string{cgroup, topic}, asyncworker.WorkNameSeperator) +} + func GetKubeletReservedQuantity(resourceName string, klConfig *kubeletconfigv1beta1.KubeletConfiguration) (resource.Quantity, bool, error) { if klConfig == nil { return resource.MustParse("0"), false, fmt.Errorf("nil klConfig") diff --git a/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go b/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go index 3e47ae2a13..c8a3fd6a23 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go @@ -91,12 +91,18 @@ func NewQoSAwarePlugin(pluginName string, conf *config.Configuration, extraConf } } - // add dynamic config watcher + // add AdminQos dynamic config watcher err = metaServer.ConfigurationManager.AddConfigWatcher(crd.AdminQoSConfigurationGVR) if err != nil { return nil, err } + // add TransparentMemoryOffloading dynamic config watcher + err = metaServer.ConfigurationManager.AddConfigWatcher(crd.TransparentMemoryOffloadingConfigurationGVR) + if err != nil { + return nil, err + } + qap := &QoSAwarePlugin{ name: pluginName, period: conf.SysAdvisorPluginsConfiguration.QoSAwarePluginConfiguration.SyncPeriod, diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go index 1be9003891..56601eb05d 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go @@ -49,7 +49,7 @@ func init() { memadvisorplugin.RegisterInitializer(memadvisorplugin.MemoryGuard, memadvisorplugin.NewMemoryGuard) memadvisorplugin.RegisterInitializer(memadvisorplugin.MemsetBinder, memadvisorplugin.NewMemsetBinder) memadvisorplugin.RegisterInitializer(memadvisorplugin.NumaMemoryBalancer, memadvisorplugin.NewMemoryBalancer) - + memadvisorplugin.RegisterInitializer(memadvisorplugin.TransparentMemoryOffloading, memadvisorplugin.NewTransparentMemoryOffloading) memadvisorplugin.RegisterInitializer(provisioner.MemoryProvisioner, provisioner.NewMemoryProvisioner) } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor_test.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor_test.go index 69c109515d..314a72c35a 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor_test.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor_test.go @@ -26,6 +26,8 @@ import ( "testing" "time" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo" + info "github.com/google/cadvisor/info/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -732,6 +734,97 @@ func TestUpdate(t *testing.T) { }, }, }, + { + name: "memory offloading", + pools: map[string]*types.PoolInfo{ + state.PoolNameReserve: { + PoolName: state.PoolNameReserve, + TopologyAwareAssignments: map[int]machine.CPUSet{ + 0: machine.MustParse("0"), + 1: machine.MustParse("24"), + }, + OriginalTopologyAwareAssignments: map[int]machine.CPUSet{ + 0: machine.MustParse("0"), + 1: machine.MustParse("24"), + }, + }, + }, + reclaimedEnable: true, + needRecvAdvices: true, + containers: []*types.ContainerInfo{ + makeContainerInfo("uid1", "default", "pod1", "c1", consts.PodAnnotationQoSLevelReclaimedCores, nil, + map[int]machine.CPUSet{ + 0: machine.MustParse("1"), + 1: machine.MustParse("25"), + }, 200<<30), + }, + pods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "uid1", + }, + }, + }, + wantHeadroom: *resource.NewQuantity(996<<30, resource.DecimalSI), + nodeMetrics: defaultNodeMetrics, + numaMetrics: defaultNumaMetrics, + containerMetrics: []containerMetric{ + { + metricName: coreconsts.MetricMemPsiAvg60Container, + metricValue: metricutil.MetricData{Value: 0.01}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemUsageContainer, + metricValue: metricutil.MetricData{Value: 10 << 30}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemInactiveAnonContainer, + metricValue: metricutil.MetricData{Value: 1 << 30}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemInactiveFileContainer, + metricValue: metricutil.MetricData{Value: 1 << 30}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemPgscanContainer, + metricValue: metricutil.MetricData{Value: 15000}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemPgstealContainer, + metricValue: metricutil.MetricData{Value: 10000}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemWorkingsetRefaultContainer, + metricValue: metricutil.MetricData{Value: 1000}, + podUID: "uid1", + containerName: "c1", + }, + }, + plugins: []types.MemoryAdvisorPluginName{memadvisorplugin.TransparentMemoryOffloading}, + wantAdviceResult: types.InternalMemoryCalculationResult{ + ContainerEntries: []types.ContainerMemoryAdvices{{ + PodUID: "uid1", + ContainerName: "c1", + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): "false", + string(memoryadvisor.ControlKnowKeyMemoryOffloading): "4831838"}, + }}, + }, + }, { name: "bind memset", pools: map[string]*types.PoolInfo{ @@ -1907,6 +2000,9 @@ func TestUpdate(t *testing.T) { advisor, metaCache := newTestMemoryAdvisor(t, tt.pods, ckDir, sfDir, fetcher, tt.plugins) advisor.conf.GetDynamicConfiguration().EnableReclaim = tt.reclaimedEnable + transparentMemoryOffloadingConfiguration := tmo.NewTransparentMemoryOffloadingConfiguration() + transparentMemoryOffloadingConfiguration.QoSLevelConfigs[consts.QoSLevelReclaimedCores] = tmo.NewTMOConfigDetail() + advisor.conf.GetDynamicConfiguration().TransparentMemoryOffloadingConfiguration = transparentMemoryOffloadingConfiguration _, advisorRecvChInterface := advisor.GetChannels() recvCh := advisorRecvChInterface.(chan types.InternalMemoryCalculationResult) diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go new file mode 100644 index 0000000000..230ebcce14 --- /dev/null +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go @@ -0,0 +1,453 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "math" + "path/filepath" + "strconv" + "sync" + "time" + + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo" + "github.com/kubewharf/katalyst-core/pkg/consts" + + "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" + katalystapiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + + "github.com/pkg/errors" + + "github.com/kubewharf/katalyst-core/pkg/util/general" + + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + + "github.com/kubewharf/katalyst-core/pkg/util/native" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" + "github.com/kubewharf/katalyst-core/pkg/config" + katalystcoreconsts "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +const ( + TransparentMemoryOffloading = "transparent-memory-offloading" +) + +const ( + CorrectionTerm = 0.1 + InactiveMaxProbe = 0.05 + OffloadingSizeScaleCoeff = 1.05 +) + +const ( + TMOBlockHandlerName string = "tmo-block-handler" +) + +var tmoPolicyFuncs sync.Map +var tmoBlockHandlers sync.Map + +type TmoStats struct { + memUsage float64 + memInactive float64 + memPsiAvg60 float64 + pgscan float64 + pgsteal float64 + refault float64 + lastOffloadingTargetSize float64 +} + +type TmoPolicyFn func( + lastStats TmoStats, + currStats TmoStats, + conf tmo.TMOConfigDetail) (error, float64) + +type TMOBlockHandler interface { + CheckTMOBlocked(*types.ContainerInfo) bool +} + +type dummyTMOBlockHandler struct { + name string +} + +func NewDummyTMOBlockHandler() *dummyTMOBlockHandler { + return &dummyTMOBlockHandler{name: TMOBlockHandlerName} +} + +func (h *dummyTMOBlockHandler) CheckTMOBlocked(*types.ContainerInfo) bool { + return false +} + +func psiPolicyFunc(lastStats TmoStats, currStats TmoStats, conf tmo.TMOConfigDetail) (error, float64) { + if conf.PSIPolicyConf == nil { + return errors.New("psi policy requires psi policy configuration"), 0 + } + return nil, math.Max(0, 1-(currStats.memPsiAvg60)/(conf.PSIPolicyConf.PsiAvg60Threshold)) * conf.PSIPolicyConf.MaxProbe * currStats.memUsage +} + +func refaultPolicyFunc(lastStats TmoStats, currStats TmoStats, conf tmo.TMOConfigDetail) (error, float64) { + if conf.RefaultPolicyConf == nil { + return errors.New("refault policy requires refault policy configurations"), 0 + } + pgscanDelta := currStats.pgscan - lastStats.pgscan + pgstealDelta := currStats.pgsteal - lastStats.pgsteal + refaultDelta := currStats.refault - lastStats.refault + reclaimAccuracyRatio := 1 - refaultDelta/(pgstealDelta+CorrectionTerm) + reclaimScanEfficiencyRatio := (pgstealDelta + CorrectionTerm) / (pgscanDelta + CorrectionTerm) + + if reclaimAccuracyRatio < conf.RefaultPolicyConf.ReclaimAccuracyTarget || reclaimScanEfficiencyRatio < conf.RefaultPolicyConf.ReclaimScanEfficiencyTarget { + return nil, math.Max(0, currStats.lastOffloadingTargetSize*reclaimAccuracyRatio) + } else { + return nil, general.Clamp(currStats.lastOffloadingTargetSize*OffloadingSizeScaleCoeff, currStats.memUsage*conf.RefaultPolicyConf.MaxProbe, currStats.memInactive*InactiveMaxProbe) + } +} + +func init() { + RegisterTMOPolicyFunc(v1alpha1.TMOPolicyNamePSI, psiPolicyFunc) + RegisterTMOPolicyFunc(v1alpha1.TMOPolicyNameRefault, refaultPolicyFunc) + RegisterTMOBlockHandlers(TMOBlockHandlerName, NewDummyTMOBlockHandler()) +} + +func RegisterTMOPolicyFunc(policyName v1alpha1.TMOPolicyName, tmoPolicyFn TmoPolicyFn) { + tmoPolicyFuncs.Store(policyName, tmoPolicyFn) +} + +func RegisterTMOBlockHandlers(handlerName string, hander TMOBlockHandler) { + tmoBlockHandlers.Store(handlerName, hander) +} + +type transparentMemoryOffloading struct { + conf *config.Configuration + mutex sync.RWMutex + metaReader metacache.MetaReader + metaServer *metaserver.MetaServer + emitter metrics.MetricEmitter + containerTmoEngines map[katalystcoreconsts.PodContainerName]TmoEngine + cgpathTmoEngines map[string]TmoEngine +} + +type TmoEngine interface { + GetContainerInfo() *types.ContainerInfo + GetCgpath() string + LoadConf(tmo.TMOConfigDetail) + GetConf() *tmo.TMOConfigDetail + CalculateOffloadingTargetSize() + GetOffloadingTargetSize() float64 +} + +type tmoEngineInstance struct { + workingRounds int64 + containerInfo *types.ContainerInfo // only valid when this tmo engine is working on container + cgpath string + metaServer *metaserver.MetaServer + emitter metrics.MetricEmitter + conf tmo.TMOConfigDetail + lastTime time.Time + lastStats TmoStats + offloadingTargetSize float64 +} + +func NewTmoEngineInstance(obj interface{}, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) *tmoEngineInstance { + tmoEngine := &tmoEngineInstance{ + workingRounds: 0, + metaServer: metaServer, + emitter: emitter, + conf: tmo.NewTMOConfigDetail(), + } + if path, ok := obj.(string); ok { + tmoEngine.cgpath = path + } + if ci, ok := obj.(*types.ContainerInfo); ok { + tmoEngine.containerInfo = ci + } + return tmoEngine +} + +func (tmoEngine *tmoEngineInstance) GetConf() *tmo.TMOConfigDetail { + return &tmoEngine.conf +} + +func (tmoEngine *tmoEngineInstance) getStats() (TmoStats, error) { + tmoStats := &TmoStats{} + var err error = nil + getCgroupMetrics := func(metaserver *metaserver.MetaServer, absPath string) { + relativePath, err := filepath.Rel(common.CgroupFSMountPoint, absPath) + if err != nil { + return + } + psiAvg60, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemPsiAvg60Cgroup) + if err != nil { + return + } + pgsteal, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemPgstealCgroup) + if err != nil { + return + } + pgscan, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemPgscanCgroup) + if err != nil { + return + } + refault, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemWorkingsetRefaultCgroup) + if err != nil { + return + } + memUsage, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemUsageCgroup) + if err != nil { + return + } + memInactiveAnon, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemInactiveAnonCgroup) + if err != nil { + return + } + memInactiveFile, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemInactiveFileCgroup) + if err != nil { + return + } + tmoStats.memUsage = memUsage.Value + tmoStats.memInactive = memInactiveFile.Value + memInactiveAnon.Value + tmoStats.memPsiAvg60 = psiAvg60.Value + tmoStats.pgsteal = pgsteal.Value + tmoStats.pgscan = pgscan.Value + tmoStats.refault = refault.Value + tmoStats.lastOffloadingTargetSize = tmoEngine.offloadingTargetSize + } + getContainerMetrics := func(metaserver *metaserver.MetaServer, podUID string, containerName string) { + psiAvg60, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemPsiAvg60Container) + if err != nil { + return + } + pgsteal, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemPgstealContainer) + if err != nil { + return + } + pgscan, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemPgscanContainer) + if err != nil { + return + } + refault, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemWorkingsetRefaultContainer) + if err != nil { + return + } + memUsage, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemUsageContainer) + if err != nil { + return + } + memInactiveAnon, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemInactiveAnonContainer) + if err != nil { + return + } + memInactiveFile, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemInactiveFileContainer) + if err != nil { + return + } + tmoStats.memUsage = memUsage.Value + tmoStats.memInactive = memInactiveFile.Value + memInactiveAnon.Value + tmoStats.memPsiAvg60 = psiAvg60.Value + tmoStats.pgsteal = pgsteal.Value + tmoStats.pgscan = pgscan.Value + tmoStats.refault = refault.Value + tmoStats.lastOffloadingTargetSize = tmoEngine.offloadingTargetSize + } + + if tmoEngine.containerInfo == nil { + // TODO: get cgroup stats from cgroup path, which requires metric fetcher to store agent metrics + getCgroupMetrics(tmoEngine.metaServer, tmoEngine.cgpath) + } else { + getContainerMetrics(tmoEngine.metaServer, tmoEngine.containerInfo.PodUID, tmoEngine.containerInfo.ContainerName) + } + return *tmoStats, err +} + +func (tmoEngine *tmoEngineInstance) GetOffloadingTargetSize() float64 { + return tmoEngine.offloadingTargetSize +} + +func (tmoEngine *tmoEngineInstance) GetContainerInfo() *types.ContainerInfo { + return tmoEngine.containerInfo +} + +func (tmoEngine *tmoEngineInstance) GetCgpath() string { + return tmoEngine.cgpath +} + +func (tmoEngine *tmoEngineInstance) LoadConf(detail tmo.TMOConfigDetail) { + // default conf for test + tmoEngine.conf = detail +} + +func (tmoEngine *tmoEngineInstance) CalculateOffloadingTargetSize() { + tmoEngine.offloadingTargetSize = 0 + + // TODO: return if TMO is disabled + /* + if !tmoEngine.conf.EnableTMO { + return + } + */ + currTime := time.Now() + if currTime.Sub(tmoEngine.lastTime) < tmoEngine.conf.Interval { + tmoEngine.offloadingTargetSize = 0 + return + } + + currStats, err := tmoEngine.getStats() + if err != nil { + general.InfoS("Failed to get metrics %v", err) + return + } + // TODO: get result from qrm to make sure last offloading action finished + if fn, ok := tmoPolicyFuncs.Load(tmoEngine.conf.PolicyName); ok { + if policyFunc, ok := fn.(TmoPolicyFn); ok { + + err, targetSize := policyFunc(tmoEngine.lastStats, currStats, tmoEngine.conf) + if err != nil { + general.InfoS("Failed to calculate offloading memory size") + return + } + tmoEngine.offloadingTargetSize = targetSize + tmoEngine.lastStats = currStats + tmoEngine.lastTime = currTime + } + } + +} + +func NewTransparentMemoryOffloading(conf *config.Configuration, extraConfig interface{}, metaReader metacache.MetaReader, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) MemoryAdvisorPlugin { + return &transparentMemoryOffloading{ + conf: conf, + metaReader: metaReader, + metaServer: metaServer, + emitter: emitter, + containerTmoEngines: make(map[consts.PodContainerName]TmoEngine), + cgpathTmoEngines: make(map[string]TmoEngine), + } +} + +func (tmo *transparentMemoryOffloading) Reconcile(status *types.MemoryPressureStatus) error { + // if !common.CheckCgroup2UnifiedMode() { + // return nil + // } + podContainerNamesMap := make(map[katalystcoreconsts.PodContainerName]bool) + // update tmo config for containers + tmo.metaReader.RangeContainer(func(podUID string, containerName string, containerInfo *types.ContainerInfo) bool { + podContainerName := native.GeneratePodContainerName(containerInfo.PodName, containerInfo.ContainerName) + podContainerNamesMap[podContainerName] = true + _, exist := tmo.containerTmoEngines[podContainerName] + if !exist { + tmo.containerTmoEngines[podContainerName] = NewTmoEngineInstance(containerInfo, tmo.metaServer, tmo.emitter) + } + // load QoSLevelConfig + if containerInfo.QoSLevel == string(katalystapiconsts.QoSLevelReclaimedCores) || containerInfo.QoSLevel == string(katalystapiconsts.QoSLevelSharedCores) || + containerInfo.QoSLevel == string(katalystapiconsts.QoSLevelDedicatedCores) || containerInfo.QoSLevel == string(katalystapiconsts.QoSLevelSystemCores) { + if tmoConfigDetail, exist := tmo.conf.GetDynamicConfiguration().QoSLevelConfigs[katalystapiconsts.QoSLevel(containerInfo.QoSLevel)]; exist { + tmo.containerTmoEngines[podContainerName].LoadConf(tmoConfigDetail) + } + } + // TODO: load spd conf if exists + + // check if the container is not allowed to execute TMO + if handler, ok := tmoBlockHandlers.Load(TMOBlockHandlerName); ok { + if tmoBlockHandler, ok := handler.(TMOBlockHandler); ok { + if tmoBlockHandler.CheckTMOBlocked(containerInfo) { + tmo.containerTmoEngines[podContainerName].GetConf().EnableTMO = false + } + } + } + return true + }) + + // update tmo config for specified cgroup paths + for cgpath, tmoConfigDetail := range tmo.conf.GetDynamicConfiguration().TransparentMemoryOffloadingConfiguration.CgroupConfigs { + if _, exist := tmo.cgpathTmoEngines[cgpath]; !exist { + tmo.cgpathTmoEngines[cgpath] = NewTmoEngineInstance(cgpath, tmo.metaServer, tmo.emitter) + } + tmo.cgpathTmoEngines[cgpath].LoadConf(tmoConfigDetail) + } + + // delete tmo engines for not existed containers + for podContainerName := range tmo.containerTmoEngines { + _, exist := podContainerNamesMap[podContainerName] + if !exist { + delete(tmo.containerTmoEngines, podContainerName) + } + } + + // delete tmo engines for not existed cgroups + for cgpath := range tmo.cgpathTmoEngines { + if _, exist := tmo.conf.GetDynamicConfiguration().CgroupConfigs[cgpath]; !exist { + delete(tmo.cgpathTmoEngines, cgpath) + } + } + + // calculate memory offloading size for each container + for _, tmoEngine := range tmo.containerTmoEngines { + tmoEngine.CalculateOffloadingTargetSize() + } + // calculate memory offloading size for each cgroups + for _, tmoEngine := range tmo.cgpathTmoEngines { + tmoEngine.CalculateOffloadingTargetSize() + } + return nil +} + +func (tmo *transparentMemoryOffloading) GetAdvices() types.InternalMemoryCalculationResult { + result := types.InternalMemoryCalculationResult{ + ContainerEntries: make([]types.ContainerMemoryAdvices, 0), + ExtraEntries: make([]types.ExtraMemoryAdvices, 0), + } + tmo.mutex.RLock() + defer tmo.mutex.RUnlock() + for _, tmoEngine := range tmo.containerTmoEngines { + if tmoEngine.GetOffloadingTargetSize() <= 0 { + continue + } + enableSwap := "false" + if tmoEngine.GetConf().EnableSwap { + enableSwap = "true" + } + entry := types.ContainerMemoryAdvices{ + PodUID: tmoEngine.GetContainerInfo().PodUID, + ContainerName: tmoEngine.GetContainerInfo().ContainerName, + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): enableSwap, + string(memoryadvisor.ControlKnowKeyMemoryOffloading): strconv.FormatInt(int64(tmoEngine.GetOffloadingTargetSize()), 10)}, + } + result.ContainerEntries = append(result.ContainerEntries, entry) + } + + for _, tmoEngine := range tmo.cgpathTmoEngines { + if tmoEngine.GetOffloadingTargetSize() <= 0 { + continue + } + enableSwap := "false" + if tmoEngine.GetConf().EnableSwap { + enableSwap = "true" + } + // TODO: support swap max set to a concrete value + entry := types.ExtraMemoryAdvices{ + CgroupPath: tmoEngine.GetCgpath(), + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): enableSwap, + string(memoryadvisor.ControlKnowKeyMemoryOffloading): strconv.FormatInt(int64(tmoEngine.GetOffloadingTargetSize()), 10)}, + } + result.ExtraEntries = append(result.ExtraEntries, entry) + } + + return result +} diff --git a/pkg/config/agent/dynamic/crd/dynamic_crd.go b/pkg/config/agent/dynamic/crd/dynamic_crd.go index a3ab3e93a5..ab93940f7b 100644 --- a/pkg/config/agent/dynamic/crd/dynamic_crd.go +++ b/pkg/config/agent/dynamic/crd/dynamic_crd.go @@ -32,8 +32,9 @@ const ( // of CRD. KCC components are responsible to identify those CRs and // trigger notification. type DynamicConfigCRD struct { - AdminQoSConfiguration *v1alpha1.AdminQoSConfiguration - AuthConfiguration *v1alpha1.AuthConfiguration + AdminQoSConfiguration *v1alpha1.AdminQoSConfiguration + AuthConfiguration *v1alpha1.AuthConfiguration + TransparentMemoryOffloadingConfiguration *v1alpha1.TransparentMemoryOffloadingConfiguration } var ( @@ -41,4 +42,6 @@ var ( AdminQoSConfigurationGVR = metav1.GroupVersionResource(v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.ResourceNameAdminQoSConfigurations)) // AuthConfigurationGVR is the group version resource for AuthConfiguration AuthConfigurationGVR = metav1.GroupVersionResource(v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.ResourceNameAuthConfigurations)) + // TransparentMemoryOffloadingConfigurationGVR is the group version resource for TransparentMemoryOffloadingConfiguration + TransparentMemoryOffloadingConfigurationGVR = metav1.GroupVersionResource(v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.ResourceNameTMOConfigurations)) ) diff --git a/pkg/config/agent/dynamic/dynamic_base.go b/pkg/config/agent/dynamic/dynamic_base.go index 8bef32fcc7..0bf88c640a 100644 --- a/pkg/config/agent/dynamic/dynamic_base.go +++ b/pkg/config/agent/dynamic/dynamic_base.go @@ -19,6 +19,8 @@ package dynamic import ( "sync" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/adminqos" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/auth" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/crd" @@ -52,16 +54,19 @@ func (c *DynamicAgentConfiguration) SetDynamicConfiguration(conf *Configuration) type Configuration struct { *adminqos.AdminQoSConfiguration *auth.AuthConfiguration + *tmo.TransparentMemoryOffloadingConfiguration } func NewConfiguration() *Configuration { return &Configuration{ - AdminQoSConfiguration: adminqos.NewAdminQoSConfiguration(), - AuthConfiguration: auth.NewAuthConfiguration(), + AdminQoSConfiguration: adminqos.NewAdminQoSConfiguration(), + AuthConfiguration: auth.NewAuthConfiguration(), + TransparentMemoryOffloadingConfiguration: tmo.NewTransparentMemoryOffloadingConfiguration(), } } func (c *Configuration) ApplyConfiguration(conf *crd.DynamicConfigCRD) { c.AdminQoSConfiguration.ApplyConfiguration(conf) c.AuthConfiguration.ApplyConfiguration(conf) + c.TransparentMemoryOffloadingConfiguration.ApplyConfiguration(conf) } diff --git a/pkg/config/agent/dynamic/tmo/tmo_base.go b/pkg/config/agent/dynamic/tmo/tmo_base.go new file mode 100644 index 0000000000..11ef53ccaa --- /dev/null +++ b/pkg/config/agent/dynamic/tmo/tmo_base.go @@ -0,0 +1,147 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tmo + +import ( + "time" + + "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/crd" +) + +const ( + DefaultEnableTMO bool = false + DefaultEnableSwap bool = false + DefaultTMOInterval time.Duration = 30 * time.Second + DefaultTMOPolicyName v1alpha1.TMOPolicyName = v1alpha1.TMOPolicyNamePSI + DefaultTMOMaxProbe float64 = 0.0005 + DefaultTMOPSIPolicyPsiAvg60Threshold float64 = 0.1 + DefaultTMORefaultPolicyReclaimAccuracyTarget float64 = 0.9 + DefaultTMORefaultPolicyReclaimScanEfficiencyTarget float64 = 0.6 +) + +type TransparentMemoryOffloadingConfiguration struct { + QoSLevelConfigs map[consts.QoSLevel]TMOConfigDetail + CgroupConfigs map[string]TMOConfigDetail +} + +func NewTransparentMemoryOffloadingConfiguration() *TransparentMemoryOffloadingConfiguration { + return &TransparentMemoryOffloadingConfiguration{ + QoSLevelConfigs: map[consts.QoSLevel]TMOConfigDetail{}, + CgroupConfigs: map[string]TMOConfigDetail{}, + } +} + +type TMOConfigDetail struct { + EnableTMO bool + EnableSwap bool + Interval time.Duration + PolicyName v1alpha1.TMOPolicyName + *PSIPolicyConf + *RefaultPolicyConf +} + +func NewTMOConfigDetail() TMOConfigDetail { + return TMOConfigDetail{ + EnableTMO: DefaultEnableTMO, + EnableSwap: DefaultEnableSwap, + Interval: DefaultTMOInterval, + PolicyName: DefaultTMOPolicyName, + PSIPolicyConf: NewPSIPolicyConf(), + RefaultPolicyConf: NewRefaultPolicyConf(), + } +} + +type PSIPolicyConf struct { + MaxProbe float64 + PsiAvg60Threshold float64 +} + +func NewPSIPolicyConf() *PSIPolicyConf { + return &PSIPolicyConf{ + MaxProbe: DefaultTMOMaxProbe, + PsiAvg60Threshold: DefaultTMOPSIPolicyPsiAvg60Threshold, + } +} + +type RefaultPolicyConf struct { + MaxProbe float64 + ReclaimAccuracyTarget float64 + ReclaimScanEfficiencyTarget float64 +} + +func NewRefaultPolicyConf() *RefaultPolicyConf { + return &RefaultPolicyConf{ + MaxProbe: DefaultTMOMaxProbe, + ReclaimAccuracyTarget: DefaultTMORefaultPolicyReclaimAccuracyTarget, + ReclaimScanEfficiencyTarget: DefaultTMORefaultPolicyReclaimScanEfficiencyTarget, + } +} + +func (c *TransparentMemoryOffloadingConfiguration) ApplyConfiguration(conf *crd.DynamicConfigCRD) { + applyConfigDetail := func(tmoConfigDetail *TMOConfigDetail, tmoConfigDetailDynamic v1alpha1.TMOConfigDetail) { + if tmoConfigDetailDynamic.EnableTMO != nil { + tmoConfigDetail.EnableTMO = *tmoConfigDetailDynamic.EnableTMO + } + if tmoConfigDetailDynamic.EnableSwap != nil { + tmoConfigDetail.EnableSwap = *tmoConfigDetailDynamic.EnableSwap + } + if tmoConfigDetailDynamic.Interval != nil { + tmoConfigDetail.Interval = tmoConfigDetailDynamic.Interval.Duration + } + if tmoConfigDetailDynamic.PolicyName != nil { + tmoConfigDetail.PolicyName = *tmoConfigDetailDynamic.PolicyName + } + if psiPolicyConfDynamic := tmoConfigDetailDynamic.PSIPolicyConf; psiPolicyConfDynamic != nil { + if psiPolicyConfDynamic.MaxProbe != nil { + tmoConfigDetail.PSIPolicyConf.MaxProbe = *psiPolicyConfDynamic.MaxProbe + } + if psiPolicyConfDynamic.PsiAvg60Threshold != nil { + tmoConfigDetail.PSIPolicyConf.PsiAvg60Threshold = *psiPolicyConfDynamic.PsiAvg60Threshold + } + } + if refaultPolicyConfDynamic := tmoConfigDetailDynamic.RefaultPolicConf; refaultPolicyConfDynamic != nil { + if refaultPolicyConfDynamic.MaxProbe != nil { + tmoConfigDetail.RefaultPolicyConf.MaxProbe = *refaultPolicyConfDynamic.MaxProbe + } + if refaultPolicyConfDynamic.ReclaimScanEfficiencyTarget != nil { + tmoConfigDetail.RefaultPolicyConf.ReclaimAccuracyTarget = *refaultPolicyConfDynamic.ReclaimAccuracyTarget + } + if refaultPolicyConfDynamic.ReclaimScanEfficiencyTarget != nil { + tmoConfigDetail.RefaultPolicyConf.ReclaimScanEfficiencyTarget = *refaultPolicyConfDynamic.ReclaimScanEfficiencyTarget + } + } + } + if tmoConf := conf.TransparentMemoryOffloadingConfiguration; tmoConf != nil { + if tmoConf.Spec.Config.QoSLevelConfig != nil { + for _, qosLevelConfig := range tmoConf.Spec.Config.QoSLevelConfig { + tmoConfigDetail := NewTMOConfigDetail() + applyConfigDetail(&tmoConfigDetail, qosLevelConfig.ConfigDetail) + c.QoSLevelConfigs[qosLevelConfig.QoSLevel] = tmoConfigDetail + + } + } + if tmoConf.Spec.Config.CgroupConfig != nil { + for _, cgroupConfig := range tmoConf.Spec.Config.CgroupConfig { + tmoConfigDetail := NewTMOConfigDetail() + applyConfigDetail(&tmoConfigDetail, cgroupConfig.ConfigDetail) + c.CgroupConfigs[cgroupConfig.CgroupPath] = tmoConfigDetail + } + } + } +} diff --git a/pkg/consts/metric.go b/pkg/consts/metric.go index 8887a76c54..86123d13f7 100644 --- a/pkg/consts/metric.go +++ b/pkg/consts/metric.go @@ -173,7 +173,13 @@ const ( MetricMemPgmajfaultRateContainer = MetricMemPgmajfaultContainer + Rate MetricMemOomRateContainer = MetricMemOomContainer + Rate - MetricMemUpdateTimeContainer = "mem.updatetime.container" + MetricMemUpdateTimeContainer = "mem.updatetime.container" + MetricMemPgstealContainer = "mem.pgsteal.container" + MetricMemPgscanContainer = "mem.pgscan.container" + MetricMemWorkingsetRefaultContainer = "mem.workingsetrefault.container" + MetricMemPsiAvg60Container = "mem.psiavg60.container" + MetricMemInactiveAnonContainer = "mem.inactiveanon.container" + MetricMemInactiveFileContainer = "mem.inactivefile.container" ) // container blkio metrics @@ -285,8 +291,14 @@ const ( MetricMemAllocstallCgroup = "mem.allocstall.cgroup" MetricMemKswapdstealCgroup = "mem.kswapdstall.cgroup" - MetricMemOomCgroup = "mem.oom.cgroup" - MetricMemScaleFactorCgroup = "mem.scalefactor.cgroup" + MetricMemOomCgroup = "mem.oom.cgroup" + MetricMemScaleFactorCgroup = "mem.scalefactor.cgroup" + MetricMemPgstealCgroup = "mem.pgsteal.cgroup" + MetricMemPgscanCgroup = "mem.pgscan.cgroup" + MetricMemWorkingsetRefaultCgroup = "mem.workingsetrefault.cgroup" + MetricMemPsiAvg60Cgroup = "mem.psiavg60.cgroup" + MetricMemInactiveAnonCgroup = "mem.inactiveanon.cgroup" + MetricMemInactiveFileCgroup = "mem.inactivefile.cgroup" ) // Cgroup blkio metrics diff --git a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go index 0103dabd3d..37539ce7d0 100644 --- a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go +++ b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go @@ -158,7 +158,8 @@ func (m *MalachiteMetricsProvisioner) updateSystemStats() { } func (m *MalachiteMetricsProvisioner) updateCgroupData() { - cgroupPaths := []string{m.baseConf.ReclaimRelativeRootCgroupPath, common.CgroupFsRootPathBurstable, common.CgroupFsRootPathBestEffort} + // TODO: add agents cgroup data + cgroupPaths := []string{m.baseConf.ReclaimRelativeRootCgroupPath, common.CgroupFsRootPathBurstable, common.CgroupFsRootPathBestEffort, common.ServiceHdfsPath} for _, path := range cgroupPaths { stats, err := m.malachiteClient.GetCgroupStats(path) if err != nil { @@ -463,6 +464,12 @@ func (m *MalachiteMetricsProvisioner) processCgroupMemoryData(cgroupPath string, m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemOomCgroup, utilmetric.MetricData{Value: float64(mem.BpfMemStat.OomCnt), Time: &updateTime}) //m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemScaleFactorCgroup, utilmetric.MetricData{Value: general.UInt64PointerToFloat64(mem.WatermarkScaleFactor), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemPgstealCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.Pgsteal), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemPgscanCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.Pgscan), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemWorkingsetRefaultCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.WorkingsetRefault), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemPsiAvg60Cgroup, utilmetric.MetricData{Value: float64(mem.BpfMemStat.MemReclaimSettingSum), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemInactiveAnonCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.InactiveAnon), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemInactiveFileCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.InactiveFile), Time: &updateTime}) } } @@ -754,6 +761,18 @@ func (m *MalachiteMetricsProvisioner) processContainerMemoryData(podUID, contain // utilmetric.MetricData{Value: general.UInt64PointerToFloat64(mem.WatermarkScaleFactor), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemUpdateTimeContainer, utilmetric.MetricData{Value: float64(mem.UpdateTime), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemPgstealContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.Pgsteal), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemPgscanContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.Pgscan), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemWorkingsetRefaultContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.WorkingsetRefault), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemPsiAvg60Container, + utilmetric.MetricData{Value: mem.MemPressure.Some.Avg60, Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemInactiveAnonContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.InactiveAnon), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemInactiveFileContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.InactiveFile), Time: &updateTime}) } } diff --git a/pkg/metaserver/agent/metric/provisioner/malachite/types/common.go b/pkg/metaserver/agent/metric/provisioner/malachite/types/common.go index 3900daadee..b74c81774d 100644 --- a/pkg/metaserver/agent/metric/provisioner/malachite/types/common.go +++ b/pkg/metaserver/agent/metric/provisioner/malachite/types/common.go @@ -101,6 +101,7 @@ type BpfNetData struct { type BpfMemData struct { OomCnt uint64 `json:"mem_oom_cnt"` MemReclaimCnt uint64 `json:"mem_reclaim_cnt"` + MemReclaimSettingSum uint64 `json:"mem_reclaim_setting_sum"` MemReclaimTime uint64 `json:"mem_reclaim_time"` MemCompactCnt uint64 `json:"mem_compact_cnt"` MemCompactFailCnt uint64 `json:"mem_compact_fail_cnt"` diff --git a/pkg/util/cgroup/common/types.go b/pkg/util/cgroup/common/types.go index 20d7320ae8..48e1409ac6 100644 --- a/pkg/util/cgroup/common/types.go +++ b/pkg/util/cgroup/common/types.go @@ -43,6 +43,18 @@ const ( SystemdRootPath = "/kubepods.slice" SystemdRootPathBestEffort = "/kubepods.slice/kubepods-besteffort.slice" SystemdRootPathBurstable = "/kubepods.slice/kubepods-burstable.slice" + + ServiceContainerdPath = "/system.slice/containerd.service" + ServiceDockerPath = "/system.slice/docker.service" + ServiceVallabyAgentPath = "/system.slice/vallaby.agent.service" + ServiceKubeletPath = "/system.slice/kubelet.service" + ServiceAuditPath = "/system.slice/auditd.service" + ServiceSystemdJournaldPath = "/system.slice/systemd-journald.service" + ServiceNydusSnapshotterPath = "/system.slice/nydus-snapshotter.service" + ServiceLxcfsPOath = "/system.slice/lxcfs.service" + ServiceHttp2pAgentPath = "/tiger/http2p.agent.service" + ServiceHdfsPath = "/hdfs" + ServiceDatanodeServicePath = "/user.slice/user-1000.slice/user@1000.service/datanode.service" ) // CgroupType defines the cgroup type that kubernetes version uses, @@ -63,8 +75,9 @@ type MemoryData struct { SoftLimitInBytes int64 // MinInBytes for memory.min // cgroup memory that can never be reclaimed by kswapd. - MinInBytes int64 - WmarkRatio int32 + MinInBytes int64 + WmarkRatio int32 + SwapMaxInBytes int64 } // CPUData set cgroup cpu data diff --git a/pkg/util/cgroup/manager/cgroup.go b/pkg/util/cgroup/manager/cgroup.go index fd401f4982..d1744457fa 100644 --- a/pkg/util/cgroup/manager/cgroup.go +++ b/pkg/util/cgroup/manager/cgroup.go @@ -19,6 +19,7 @@ package manager import ( "context" "fmt" + "io/fs" "os/exec" "path/filepath" "time" @@ -300,3 +301,114 @@ func DropCacheWithTimeoutWithRelativePath(timeoutSecs int, absCgroupPath string, return nil } + +func SetSwapMaxWithAbsolutePathToParentCgroupRecursive(absCgroupPath string) error { + memStats, err := GetMemoryWithAbsolutePath(absCgroupPath) + if err != nil { + general.Infof("[SetSwapMaxWithAbsolutePathToParentCgroupRecursive] Failed to get mem stats from path: %s, err: %v", absCgroupPath, err) + return err + } + swapMaxData := &common.MemoryData{SwapMaxInBytes: int64(memStats.Limit)} + err = GetManager().ApplyMemory(absCgroupPath, swapMaxData) + if err != nil { + return err + } + + parentDir := filepath.Dir(absCgroupPath) + if parentDir != absCgroupPath { + err = SetSwapMaxWithAbsolutePathToParentCgroupRecursive(parentDir) + if err != nil { + return err + } + } + return nil +} +func SetSwapMaxWithAbsolutePathRecursive(absCgroupPath string) error { + if !common.CheckCgroup2UnifiedMode() { + general.Infof("[SetSwapMaxWithAbsolutePathRecursive] is not supported on cgroupv1") + return nil + } + + // set swap max to parent cgroups recursively + _ = SetSwapMaxWithAbsolutePathToParentCgroupRecursive(filepath.Dir(absCgroupPath)) + + // set swap max to sub cgroups recursively + err := filepath.Walk(absCgroupPath, func(path string, info fs.FileInfo, err error) error { + if err != nil { + general.Infof("prevent panic by handling failure accessing a path: %s, err: %v", path, err) + return err + } + if info.IsDir() { + memStats, err := GetMemoryWithAbsolutePath(path) + if err != nil { + return filepath.SkipDir + } + swapMaxData := &common.MemoryData{SwapMaxInBytes: int64(memStats.Limit - memStats.Usage)} + err = GetManager().ApplyMemory(absCgroupPath, swapMaxData) + if err != nil { + return filepath.SkipDir + } + } + return nil + }) + if err != nil { + general.Infof("error walking the path: %s, err: %v", absCgroupPath, err) + return err + } + return nil +} + +func DisableSwapMaxWithAbsolutePathRecursive(absCgroupPath string) error { + if !common.CheckCgroup2UnifiedMode() { + general.Infof("[DisableSwapMaxWithAbsolutePathRecursive] is not supported on cgroupv1") + return nil + } + // disable swap to sub cgroups recursively + err := filepath.Walk(absCgroupPath, func(path string, info fs.FileInfo, err error) error { + if err != nil { + general.Infof("prevent panic by handling failure accessing a path: %s, err: %v", path, err) + return err + } + if info.IsDir() { + swapMaxData := &common.MemoryData{SwapMaxInBytes: 0} + err = GetManager().ApplyMemory(absCgroupPath, swapMaxData) + if err != nil { + return filepath.SkipDir + } + } + return nil + }) + if err != nil { + general.Infof("error walking the path: %s, err: %v ", absCgroupPath, err) + return err + } + return nil +} +func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, nbytes int64) error { + startTime := time.Now() + + var cmd string + if common.CheckCgroup2UnifiedMode() { + if nbytes <= 0 { + general.Infof("[MemoryOffloadingWithAbsolutePath] skip memory reclaim on %s since nbytes is not valid", absCgroupPath) + return nil + } + //cgv2 + cmd = fmt.Sprintf("echo %d > %s", nbytes, filepath.Join(absCgroupPath, "memory.reclaim")) + } else { + //cgv1 + general.Infof("[MemoryOffloadingWithAbsolutePath] is not supported on cgroupv1") + return nil + } + + _, err := exec.Command("bash", "-c", cmd).Output() + + _ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{ + "absCGPath": absCgroupPath, + "succeeded": fmt.Sprintf("%v", err == nil), + })...) + delta := time.Since(startTime).Seconds() + general.Infof("[MemoryOffloadingWithAbsolutePath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath) + + return err +} diff --git a/pkg/util/cgroup/manager/v1/fs_linux.go b/pkg/util/cgroup/manager/v1/fs_linux.go index c7ced58793..6ea48c0754 100644 --- a/pkg/util/cgroup/manager/v1/fs_linux.go +++ b/pkg/util/cgroup/manager/v1/fs_linux.go @@ -77,7 +77,6 @@ func (m *manager) ApplyMemory(absCgroupPath string, data *common.MemoryData) err klog.Infof("[CgroupV1] apply memory wmark successfully, cgroupPath: %s, data: %v, old data: %v\n", absCgroupPath, data.WmarkRatio, oldData) } } - return nil } diff --git a/pkg/util/cgroup/manager/v2/fs_linux.go b/pkg/util/cgroup/manager/v2/fs_linux.go index 1ff1529456..e507a39485 100644 --- a/pkg/util/cgroup/manager/v2/fs_linux.go +++ b/pkg/util/cgroup/manager/v2/fs_linux.go @@ -81,6 +81,13 @@ func (m *manager) ApplyMemory(absCgroupPath string, data *common.MemoryData) err } } + if data.SwapMaxInBytes >= 0 { + if err, applied, oldData := common.WriteFileIfChange(absCgroupPath, "memory.swap.max", numToStr(data.SwapMaxInBytes)); err != nil { + return err + } else if applied { + klog.Infof("[CgroupV2] apply memory swap max successfully, cgroupPath: %s, data: %v, old data: %v\n", absCgroupPath, data.SwapMaxInBytes, oldData) + } + } return nil }