diff --git a/cmd/katalyst-agent/app/options/dynamic/dynamic_base.go b/cmd/katalyst-agent/app/options/dynamic/dynamic_base.go index 8a4152c57..628510448 100644 --- a/cmd/katalyst-agent/app/options/dynamic/dynamic_base.go +++ b/cmd/katalyst-agent/app/options/dynamic/dynamic_base.go @@ -21,25 +21,30 @@ import ( cliflag "k8s.io/component-base/cli/flag" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/adminqos" + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/tmo" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" ) type DynamicOptions struct { *adminqos.AdminQoSOptions + *tmo.TransparentMemoryOffloadingOptions } func NewDynamicOptions() *DynamicOptions { return &DynamicOptions{ - AdminQoSOptions: adminqos.NewAdminQoSOptions(), + AdminQoSOptions: adminqos.NewAdminQoSOptions(), + TransparentMemoryOffloadingOptions: tmo.NewTransparentMemoryOffloadingOptions(), } } func (o *DynamicOptions) AddFlags(fss *cliflag.NamedFlagSets) { o.AdminQoSOptions.AddFlags(fss) + o.TransparentMemoryOffloadingOptions.AddFlags(fss) } func (o *DynamicOptions) ApplyTo(c *dynamic.Configuration) error { var errList []error errList = append(errList, o.AdminQoSOptions.ApplyTo(c.AdminQoSConfiguration)) + errList = append(errList, o.TransparentMemoryOffloadingOptions.ApplyTo(c.TransparentMemoryOffloadingConfiguration)) return errors.NewAggregate(errList) } diff --git a/cmd/katalyst-agent/app/options/dynamic/tmo/tmo_base.go b/cmd/katalyst-agent/app/options/dynamic/tmo/tmo_base.go new file mode 100644 index 000000000..c7165ca6c --- /dev/null +++ b/cmd/katalyst-agent/app/options/dynamic/tmo/tmo_base.go @@ -0,0 +1,45 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tmo + +import ( + "k8s.io/apimachinery/pkg/util/errors" + cliflag "k8s.io/component-base/cli/flag" + + "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/dynamic/tmo/tmodefault" + tmodynamicconf "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo" +) + +type TransparentMemoryOffloadingOptions struct { + *tmodefault.DefaultOptions +} + +func NewTransparentMemoryOffloadingOptions() *TransparentMemoryOffloadingOptions { + return &TransparentMemoryOffloadingOptions{ + DefaultOptions: tmodefault.NewDefaultOptions(), + } +} + +func (o *TransparentMemoryOffloadingOptions) AddFlags(fss *cliflag.NamedFlagSets) { + o.DefaultOptions.AddFlags(fss) +} + +func (o *TransparentMemoryOffloadingOptions) ApplyTo(c *tmodynamicconf.TransparentMemoryOffloadingConfiguration) error { + var errList []error + errList = append(errList, o.DefaultOptions.ApplyTo(c.DefaultConfigurations)) + return errors.NewAggregate(errList) +} diff --git a/cmd/katalyst-agent/app/options/dynamic/tmo/tmodefault/tmo_default_config.go b/cmd/katalyst-agent/app/options/dynamic/tmo/tmodefault/tmo_default_config.go new file mode 100644 index 000000000..0ccae5b52 --- /dev/null +++ b/cmd/katalyst-agent/app/options/dynamic/tmo/tmodefault/tmo_default_config.go @@ -0,0 +1,83 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tmodefault + +import ( + "time" + + cliflag "k8s.io/component-base/cli/flag" + + "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" + tmodynamicconf "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo" +) + +type DefaultOptions struct { + DefaultEnableTMO bool + DefaultEnableSwap bool + DefaultTMOInterval time.Duration + DefaultTMOPolicyName string + DefaultTMOMaxProbe float64 + DefaultTMOPSIPolicyPSIAvg60Threshold float64 + DefaultTMORefaultPolicyReclaimAccuracyTarget float64 + DefaultTMORefaultPolicyReclaimScanEfficiencyTarget float64 +} + +func NewDefaultOptions() *DefaultOptions { + return &DefaultOptions{ + DefaultEnableTMO: tmodynamicconf.DefaultEnableTMO, + DefaultEnableSwap: tmodynamicconf.DefaultEnableSwap, + DefaultTMOInterval: tmodynamicconf.DefaultTMOInterval, + DefaultTMOPolicyName: string(tmodynamicconf.DefaultTMOPolicyName), + DefaultTMOMaxProbe: tmodynamicconf.DefaultTMOMaxProbe, + DefaultTMOPSIPolicyPSIAvg60Threshold: tmodynamicconf.DefaultTMOPSIPolicyPSIAvg60Threshold, + DefaultTMORefaultPolicyReclaimAccuracyTarget: tmodynamicconf.DefaultTMORefaultPolicyReclaimAccuracyTarget, + DefaultTMORefaultPolicyReclaimScanEfficiencyTarget: tmodynamicconf.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget, + } +} + +func (o *DefaultOptions) AddFlags(fss *cliflag.NamedFlagSets) { + fs := fss.FlagSet("tmo-default-config") + + fs.BoolVar(&o.DefaultEnableTMO, "default-enable-tmo", o.DefaultEnableTMO, + "whether to enable transparent memory offloading by default") + fs.BoolVar(&o.DefaultEnableSwap, "default-enable-swap", o.DefaultEnableSwap, + "whether to enable swap (offload rss) in TMO by default") + fs.DurationVar(&o.DefaultTMOInterval, "default-tmo-min-interval", o.DefaultTMOInterval, + "default minimum interval to trigger TMO on each container or cgroup") + fs.StringVar(&o.DefaultTMOPolicyName, "default-tmo-policy-name", o.DefaultTMOPolicyName, + "default policy used to calculate memory offloading size") + fs.Float64Var(&o.DefaultTMOMaxProbe, "default-max-probe", o.DefaultTMOMaxProbe, + "default maximum ratio of memory usage could be offloaded in one cycle") + fs.Float64Var(&o.DefaultTMOPSIPolicyPSIAvg60Threshold, "default-psi-policy-psi-avg60-threshold", o.DefaultTMOPSIPolicyPSIAvg60Threshold, + "indicates the default threshold of memory pressure. If observed pressure exceeds this threshold, memory offloading will be paused.") + fs.Float64Var(&o.DefaultTMORefaultPolicyReclaimAccuracyTarget, "default-refault-policy-reclaim-accuracy-target", o.DefaultTMORefaultPolicyReclaimAccuracyTarget, + "indicates the default desired level of precision or accuracy in offloaded pages") + fs.Float64Var(&o.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget, "default-refault-policy-reclaim-scan-efficiency-target", o.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget, + "indicates the default desired level of efficiency in scanning and identifying memory pages that can be offloaded.") +} + +func (o *DefaultOptions) ApplyTo(c *tmodynamicconf.TMODefaultConfigurations) error { + c.DefaultEnableTMO = o.DefaultEnableTMO + c.DefaultEnableSwap = o.DefaultEnableSwap + c.DefaultTMOInterval = o.DefaultTMOInterval + c.DefaultTMOPolicyName = v1alpha1.TMOPolicyName(o.DefaultTMOPolicyName) + c.DefaultTMOMaxProbe = o.DefaultTMOMaxProbe + c.DefaultTMOPSIPolicyPSIAvg60Threshold = o.DefaultTMOPSIPolicyPSIAvg60Threshold + c.DefaultTMORefaultPolicyReclaimAccuracyTarget = o.DefaultTMORefaultPolicyReclaimAccuracyTarget + c.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget = o.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget + return nil +} diff --git a/cmd/katalyst-agent/app/options/global/base.go b/cmd/katalyst-agent/app/options/global/base.go index 5bd924b2f..385dba21b 100644 --- a/cmd/katalyst-agent/app/options/global/base.go +++ b/cmd/katalyst-agent/app/options/global/base.go @@ -51,6 +51,7 @@ type BaseOptions struct { AdditionalCgroupPaths []string ReclaimRelativeRootCgroupPath string + GeneralRelativeCgroupPaths []string // configurations for kubelet KubeletReadOnlyPort int @@ -113,6 +114,8 @@ func (o *BaseOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs.StringVar(&o.ReclaimRelativeRootCgroupPath, "reclaim-relative-root-cgroup-path", o.ReclaimRelativeRootCgroupPath, "top level cgroup path for reclaimed_cores qos level") + fs.StringSliceVar(&o.GeneralRelativeCgroupPaths, "general-relative-cgroup-paths", o.GeneralRelativeCgroupPaths, + "The cgroup paths of standalone services which not managed by kubernetes") fs.IntVar(&o.KubeletReadOnlyPort, "kubelet-read-only-port", o.KubeletReadOnlyPort, "The read-only port for the kubelet to serve") @@ -152,6 +155,7 @@ func (o *BaseOptions) ApplyTo(c *global.BaseConfiguration) error { c.LockWaitingEnabled = o.LockWaitingEnabled c.ReclaimRelativeRootCgroupPath = o.ReclaimRelativeRootCgroupPath + c.GeneralRelativeCgroupPaths = o.GeneralRelativeCgroupPaths c.NetMultipleNS = o.MachineNetMultipleNS c.NetNSDirAbsPath = o.MachineNetNSDirAbsPath diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/const.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/const.go index 8a782affd..6f7b4aa84 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/const.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor/const.go @@ -24,4 +24,6 @@ const ( ControlKnobKeyCPUSetMems MemoryControlKnobName = "cpuset_mems" ControlKnobReclaimedMemorySize MemoryControlKnobName = "reclaimed_memory_size" ControlKnobKeyBalanceNumaMemory MemoryControlKnobName = "balance_numa_memory" + ControlKnobKeySwapMax MemoryControlKnobName = "swap_max" + ControlKnowKeyMemoryOffloading MemoryControlKnobName = "memory_offloading" ) diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go index 2d4680bf8..c954847f3 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy.go @@ -58,10 +58,11 @@ import ( const ( MemoryResourcePluginPolicyNameDynamic = string(apiconsts.ResourcePluginPolicyNameDynamic) - memoryPluginStateFileName = "memory_plugin_state" - memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers" - memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache" - memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page" + memoryPluginStateFileName = "memory_plugin_state" + memoryPluginAsyncWorkersName = "qrm_memory_plugin_async_workers" + memoryPluginAsyncWorkTopicDropCache = "qrm_memory_plugin_drop_cache" + memoryPluginAsyncWorkTopicMovePage = "qrm_memory_plugin_move_page" + memoryPluginAsyncWorkTopicMemoryOffloading = "qrm_memory_plugin_mem_offload" dropCacheTimeoutSeconds = 30 ) @@ -238,6 +239,8 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryProvisions)) memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyBalanceNumaMemory, memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleNumaMemoryBalance)) + memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnowKeyMemoryOffloading, + memoryadvisor.ControlKnobHandlerWithChecker(policyImplement.handleAdvisorMemoryOffloading)) return true, &agent.PluginWrapper{GenericPlugin: pluginWrapper}, nil } diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go index 889247ee3..f60ac5a21 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_advisor_handler.go @@ -42,9 +42,11 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic" + "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/asyncworker" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" cgroupcommon "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" cgroupmgr "github.com/kubewharf/katalyst-core/pkg/util/cgroup/manager" "github.com/kubewharf/katalyst-core/pkg/util/general" @@ -682,6 +684,70 @@ func (p *DynamicPolicy) doNumaMemoryBalance(ctx context.Context, advice types.Nu _ = p.emitter.StoreInt64(util.MetricNameMemoryNumaBalanceResult, 1, metrics.MetricTypeNameRaw, metrics.MetricTag{Key: "success", Val: strconv.FormatBool(migrateSuccess)}) + return nil +} + +// handleAdvisorMemoryOffloading handles memory offloading from memory-advisor +func (p *DynamicPolicy) handleAdvisorMemoryOffloading(_ *config.Configuration, + _ interface{}, + _ *dynamicconfig.DynamicAgentConfiguration, + emitter metrics.MetricEmitter, + metaServer *metaserver.MetaServer, + entryName, subEntryName string, + calculationInfo *advisorsvc.CalculationInfo, podResourceEntries state.PodResourceEntries) error { + var absCGPath string + var memoryOffloadingWorkName string + memoryOffloadingSizeInBytes := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnowKeyMemoryOffloading)] + memoryOffloadingSizeInBytesInt64, err := strconv.ParseInt(memoryOffloadingSizeInBytes, 10, 64) + if err != nil { + return fmt.Errorf("parse %s: %s failed with error: %v", memoryadvisor.ControlKnowKeyMemoryOffloading, memoryOffloadingSizeInBytes, err) + } + + if calculationInfo.CgroupPath == "" { + memoryOffloadingWorkName = util.GetContainerAsyncWorkName(entryName, subEntryName, memoryPluginAsyncWorkTopicMemoryOffloading) + containerID, err := metaServer.GetContainerID(entryName, subEntryName) + if err != nil { + return fmt.Errorf("GetContainerID failed with error: %v", err) + } + absCGPath, err = common.GetContainerAbsCgroupPath(common.CgroupSubsysMemory, entryName, containerID) + if err != nil { + return fmt.Errorf("GetContainerAbsCgroupPath failed with error: %v", err) + } + } else { + memoryOffloadingWorkName = util.GetCgroupAsyncWorkName(calculationInfo.CgroupPath, memoryPluginAsyncWorkTopicMemoryOffloading) + absCGPath = common.GetAbsCgroupPath(common.CgroupSubsysMemory, calculationInfo.CgroupPath) + } + + // set swap max before trigger memory offloading + swapMax := calculationInfo.CalculationResult.Values[string(memoryadvisor.ControlKnobKeySwapMax)] + if swapMax == consts.ControlKnobON { + err := cgroupmgr.SetSwapMaxWithAbsolutePathRecursive(absCGPath) + if err != nil { + general.Infof("Failed to set swap max, err: %v", err) + } + } else { + err := cgroupmgr.DisableSwapMaxWithAbsolutePathRecursive(absCGPath) + if err != nil { + general.Infof("Failed to disable swap, err: %v", err) + } + } + + // start a asynchronous work to execute memory offloading + err = p.asyncWorkers.AddWork(memoryOffloadingWorkName, + &asyncworker.Work{ + Fn: cgroupmgr.MemoryOffloadingWithAbsolutePath, + Params: []interface{}{absCGPath, memoryOffloadingSizeInBytesInt64}, + DeliveredAt: time.Now()}, asyncworker.DuplicateWorkPolicyOverride) + if err != nil { + return fmt.Errorf("add work: %s pod: %s container: %s cgroup: %s failed with error: %v", memoryOffloadingWorkName, entryName, subEntryName, absCGPath, err) + } + + _ = emitter.StoreInt64(util.MetricNameMemoryHandlerAdvisorMemoryOffload, memoryOffloadingSizeInBytesInt64, + metrics.MetricTypeNameRaw, metrics.ConvertMapToTags(map[string]string{ + "entryName": entryName, + "subEntryName": subEntryName, + "cgroupPath": calculationInfo.CgroupPath, + })...) return nil } diff --git a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go index 913a8ebd9..15cf12476 100644 --- a/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/memory/dynamicpolicy/policy_test.go @@ -62,6 +62,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/config/agent/global" qrmconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" "github.com/kubewharf/katalyst-core/pkg/config/generic" + coreconsts "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" metaserveragent "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" @@ -1706,6 +1707,7 @@ func TestHandleAdvisorResp(t *testing.T) { pod1UID := string(uuid.NewUUID()) pod2UID := string(uuid.NewUUID()) pod3UID := string(uuid.NewUUID()) + pod4UID := string(uuid.NewUUID()) testName := "test" testCases := []struct { @@ -1716,7 +1718,7 @@ func TestHandleAdvisorResp(t *testing.T) { lwResp *advisorsvc.ListAndWatchResponse }{ { - description: "one shared_cores container, one reclaimed_cores container, one dedicated_cores container", + description: "one shared_cores container, two reclaimed_cores container, one dedicated_cores container", podResourceEntries: state.PodResourceEntries{ v1.ResourceMemory: state.PodEntries{ pod1UID: state.ContainerEntries{ @@ -1819,6 +1821,18 @@ func TestHandleAdvisorResp(t *testing.T) { }, }, }, + pod4UID: { + ContainerEntries: map[string]*advisorsvc.CalculationInfo{ + testName: { + CalculationResult: &advisorsvc.CalculationResult{ + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): coreconsts.ControlKnobON, + string(memoryadvisor.ControlKnowKeyMemoryOffloading): "40960", + }, + }, + }, + }, + }, }, }, expectedPodResourceEntries: state.PodResourceEntries{ @@ -2120,6 +2134,8 @@ func TestHandleAdvisorResp(t *testing.T) { memoryadvisor.ControlKnobHandlerWithChecker(handleAdvisorCPUSetMems)) memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnobKeyDropCache, memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorDropCache)) + memoryadvisor.RegisterControlKnobHandler(memoryadvisor.ControlKnowKeyMemoryOffloading, + memoryadvisor.ControlKnobHandlerWithChecker(dynamicPolicy.handleAdvisorMemoryOffloading)) machineState, err := state.GenerateMachineStateFromPodEntries(machineInfo, tc.podResourceEntries, resourcesReservedMemory) as.Nil(err) diff --git a/pkg/agent/qrm-plugins/util/consts.go b/pkg/agent/qrm-plugins/util/consts.go index c8722dcdf..14cb910c6 100644 --- a/pkg/agent/qrm-plugins/util/consts.go +++ b/pkg/agent/qrm-plugins/util/consts.go @@ -42,6 +42,7 @@ const ( MetricNameMemoryHandleAdvisorMemoryLimit = "memory_handle_advisor_memory_limit" MetricNameMemoryHandleAdvisorDropCache = "memory_handle_advisor_drop_cache" MetricNameMemoryHandleAdvisorCPUSetMems = "memory_handle_advisor_cpuset_mems" + MetricNameMemoryHandlerAdvisorMemoryOffload = "memory_handler_advisor_memory_offloading" MetricNameMemoryOOMPriorityDeleteFailed = "memory_oom_priority_delete_failed" MetricNameMemoryOOMPriorityUpdateFailed = "memory_oom_priority_update_failed" MetricNameMemoryNumaBalance = "memory_handle_numa_balance" diff --git a/pkg/agent/qrm-plugins/util/util.go b/pkg/agent/qrm-plugins/util/util.go index f1ecf5328..5a46bf457 100644 --- a/pkg/agent/qrm-plugins/util/util.go +++ b/pkg/agent/qrm-plugins/util/util.go @@ -315,6 +315,10 @@ func GetContainerAsyncWorkName(podUID, containerName, topic string) string { return strings.Join([]string{podUID, containerName, topic}, asyncworker.WorkNameSeperator) } +func GetCgroupAsyncWorkName(cgroup, topic string) string { + return strings.Join([]string{cgroup, topic}, asyncworker.WorkNameSeperator) +} + func GetKubeletReservedQuantity(resourceName string, klConfig *kubeletconfigv1beta1.KubeletConfiguration) (resource.Quantity, bool, error) { if klConfig == nil { return resource.MustParse("0"), false, fmt.Errorf("nil klConfig") diff --git a/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go b/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go index 3e47ae2a1..c8a3fd6a2 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go @@ -91,12 +91,18 @@ func NewQoSAwarePlugin(pluginName string, conf *config.Configuration, extraConf } } - // add dynamic config watcher + // add AdminQos dynamic config watcher err = metaServer.ConfigurationManager.AddConfigWatcher(crd.AdminQoSConfigurationGVR) if err != nil { return nil, err } + // add TransparentMemoryOffloading dynamic config watcher + err = metaServer.ConfigurationManager.AddConfigWatcher(crd.TransparentMemoryOffloadingConfigurationGVR) + if err != nil { + return nil, err + } + qap := &QoSAwarePlugin{ name: pluginName, period: conf.SysAdvisorPluginsConfiguration.QoSAwarePluginConfiguration.SyncPeriod, diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/helper.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/helper.go index ebec9ad3d..bdb5196d6 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/helper.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/helper/helper.go @@ -20,8 +20,10 @@ import ( "context" "fmt" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/spd" "github.com/kubewharf/katalyst-core/pkg/util/general" @@ -82,3 +84,22 @@ func PodPerformanceScore(ctx context.Context, metaServer *metaserver.MetaServer, return metaServer.ServiceBusinessPerformanceScore(ctx, pod.ObjectMeta) } + +func PodIsDaemonSet(pod *v1.Pod) bool { + if pod != nil && pod.OwnerReferences != nil { + for _, ownerReference := range pod.OwnerReferences { + if ownerReference.Kind == "DaemonSet" { + return true + } + } + } + return false +} + +func IsValidQosLevel(qoslevel string) bool { + if qoslevel == string(consts.QoSLevelReclaimedCores) || qoslevel == string(consts.QoSLevelSharedCores) || + qoslevel == string(consts.QoSLevelDedicatedCores) || qoslevel == string(consts.QoSLevelSystemCores) { + return true + } + return false +} diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go index d3625b4cc..22f4dfd15 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go @@ -50,7 +50,7 @@ func init() { memadvisorplugin.RegisterInitializer(memadvisorplugin.MemoryGuard, memadvisorplugin.NewMemoryGuard) memadvisorplugin.RegisterInitializer(memadvisorplugin.MemsetBinder, memadvisorplugin.NewMemsetBinder) memadvisorplugin.RegisterInitializer(memadvisorplugin.NumaMemoryBalancer, memadvisorplugin.NewMemoryBalancer) - + memadvisorplugin.RegisterInitializer(memadvisorplugin.TransparentMemoryOffloading, memadvisorplugin.NewTransparentMemoryOffloading) memadvisorplugin.RegisterInitializer(provisioner.MemoryProvisioner, provisioner.NewMemoryProvisioner) } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor_test.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor_test.go index 8e019a9b7..0eed853c0 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor_test.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor_test.go @@ -46,6 +46,7 @@ import ( memadvisorplugin "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo" coreconsts "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" @@ -793,6 +794,251 @@ func TestUpdate(t *testing.T) { ExtraEntries: []types.ExtraMemoryAdvices{}, }, }, + { + name: "memory offloading", + pools: map[string]*types.PoolInfo{ + state.PoolNameReserve: { + PoolName: state.PoolNameReserve, + TopologyAwareAssignments: map[int]machine.CPUSet{ + 0: machine.MustParse("0"), + 1: machine.MustParse("24"), + }, + OriginalTopologyAwareAssignments: map[int]machine.CPUSet{ + 0: machine.MustParse("0"), + 1: machine.MustParse("24"), + }, + }, + }, + reclaimedEnable: true, + needRecvAdvices: true, + containers: []*types.ContainerInfo{ + makeContainerInfo("uid1", "default", "pod1", "c1", consts.PodAnnotationQoSLevelReclaimedCores, nil, + map[int]machine.CPUSet{ + 0: machine.MustParse("1"), + 1: machine.MustParse("25"), + }, 200<<30), + }, + pods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "uid1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + }, + }, + }, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "c1", + ContainerID: "containerd://c1", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + UID: "uid2", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c2", + }, + }, + }, + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "c2", + ContainerID: "containerd://c2", + }, + }, + }, + }, + }, + wantHeadroom: *resource.NewQuantity(996<<30, resource.DecimalSI), + nodeMetrics: defaultNodeMetrics, + numaMetrics: defaultNumaMetrics, + containerMetrics: []containerMetric{ + { + metricName: coreconsts.MetricMemPsiAvg60Container, + metricValue: metricutil.MetricData{Value: 0.01}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemUsageContainer, + metricValue: metricutil.MetricData{Value: 10 << 30}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemInactiveAnonContainer, + metricValue: metricutil.MetricData{Value: 1 << 30}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemInactiveFileContainer, + metricValue: metricutil.MetricData{Value: 1 << 30}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemPgscanContainer, + metricValue: metricutil.MetricData{Value: 15000}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemPgstealContainer, + metricValue: metricutil.MetricData{Value: 10000}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemWorkingsetRefaultContainer, + metricValue: metricutil.MetricData{Value: 1000}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemWorkingsetActivateContainer, + metricValue: metricutil.MetricData{Value: 1000}, + podUID: "uid1", + containerName: "c1", + }, + { + metricName: coreconsts.MetricMemPsiAvg60Container, + metricValue: metricutil.MetricData{Value: 0.01}, + podUID: "uid2", + containerName: "c2", + }, + { + metricName: coreconsts.MetricMemUsageContainer, + metricValue: metricutil.MetricData{Value: 10 << 30}, + podUID: "uid2", + containerName: "c2", + }, + { + metricName: coreconsts.MetricMemInactiveAnonContainer, + metricValue: metricutil.MetricData{Value: 1 << 30}, + podUID: "uid2", + containerName: "c2", + }, + { + metricName: coreconsts.MetricMemInactiveFileContainer, + metricValue: metricutil.MetricData{Value: 1 << 30}, + podUID: "uid2", + containerName: "c2", + }, + { + metricName: coreconsts.MetricMemPgscanContainer, + metricValue: metricutil.MetricData{Value: 15000}, + podUID: "uid2", + containerName: "c2", + }, + { + metricName: coreconsts.MetricMemPgstealContainer, + metricValue: metricutil.MetricData{Value: 10000}, + podUID: "uid2", + containerName: "c2", + }, + { + metricName: coreconsts.MetricMemWorkingsetRefaultContainer, + metricValue: metricutil.MetricData{Value: 1000}, + podUID: "uid2", + containerName: "c2", + }, + { + metricName: coreconsts.MetricMemWorkingsetActivateContainer, + metricValue: metricutil.MetricData{Value: 1000}, + podUID: "uid2", + containerName: "c2", + }, + }, + cgroupMetrics: []cgroupMetric{ + { + metricName: coreconsts.MetricMemPsiAvg60Cgroup, + metricValue: metricutil.MetricData{Value: 0.01}, + cgroupPath: "/hdfs", + }, + { + metricName: coreconsts.MetricMemPgstealCgroup, + metricValue: metricutil.MetricData{Value: 0.01}, + cgroupPath: "/hdfs", + }, + { + metricName: coreconsts.MetricMemPgscanCgroup, + metricValue: metricutil.MetricData{Value: 0.01}, + cgroupPath: "/hdfs", + }, + { + metricName: coreconsts.MetricMemWorkingsetRefaultCgroup, + metricValue: metricutil.MetricData{Value: 0.01}, + cgroupPath: "/hdfs", + }, + { + metricName: coreconsts.MetricMemWorkingsetActivateCgroup, + metricValue: metricutil.MetricData{Value: 1 << 30}, + cgroupPath: "/hdfs", + }, + { + metricName: coreconsts.MetricMemUsageCgroup, + metricValue: metricutil.MetricData{Value: 4 << 30}, + cgroupPath: "/hdfs", + }, + { + metricName: coreconsts.MetricMemInactiveAnonCgroup, + metricValue: metricutil.MetricData{Value: 1 << 30}, + cgroupPath: "/hdfs", + }, + { + metricName: coreconsts.MetricMemInactiveFileCgroup, + metricValue: metricutil.MetricData{Value: 1 << 30}, + cgroupPath: "/hdfs", + }, + }, + plugins: []types.MemoryAdvisorPluginName{memadvisorplugin.TransparentMemoryOffloading}, + wantAdviceResult: types.InternalMemoryCalculationResult{ + ExtraEntries: []types.ExtraMemoryAdvices{ + { + CgroupPath: "/hdfs", + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): coreconsts.ControlKnobON, + string(memoryadvisor.ControlKnowKeyMemoryOffloading): "38654705", + }, + }, + }, + ContainerEntries: []types.ContainerMemoryAdvices{{ + PodUID: "uid1", + ContainerName: "c1", + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): coreconsts.ControlKnobON, + string(memoryadvisor.ControlKnowKeyMemoryOffloading): "96636764"}, + }}, + }, + }, { name: "bind memset", pools: map[string]*types.PoolInfo{ @@ -1974,6 +2220,17 @@ func TestUpdate(t *testing.T) { advisor, metaCache := newTestMemoryAdvisor(t, tt.pods, ckDir, sfDir, fetcher, tt.plugins) advisor.conf.GetDynamicConfiguration().EnableReclaim = tt.reclaimedEnable + transparentMemoryOffloadingConfiguration := tmo.NewTransparentMemoryOffloadingConfiguration() + transparentMemoryOffloadingConfiguration.QoSLevelConfigs[consts.QoSLevelReclaimedCores] = tmo.NewTMOConfigDetail(transparentMemoryOffloadingConfiguration.DefaultConfigurations) + transparentMemoryOffloadingConfiguration.QoSLevelConfigs[consts.QoSLevelReclaimedCores].EnableTMO = true + transparentMemoryOffloadingConfiguration.QoSLevelConfigs[consts.QoSLevelReclaimedCores].EnableSwap = true + + // cgroup level + transparentMemoryOffloadingConfiguration.CgroupConfigs["/sys/fs/cgroup/hdfs"] = tmo.NewTMOConfigDetail(transparentMemoryOffloadingConfiguration.DefaultConfigurations) + transparentMemoryOffloadingConfiguration.CgroupConfigs["/sys/fs/cgroup/hdfs"].EnableTMO = true + transparentMemoryOffloadingConfiguration.CgroupConfigs["/sys/fs/cgroup/hdfs"].EnableSwap = true + + advisor.conf.GetDynamicConfiguration().TransparentMemoryOffloadingConfiguration = transparentMemoryOffloadingConfiguration _, advisorRecvChInterface := advisor.GetChannels() recvCh := advisorRecvChInterface.(chan types.InternalMemoryCalculationResult) diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go new file mode 100644 index 000000000..c8bc9fd05 --- /dev/null +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/plugin/memory_offloading.go @@ -0,0 +1,553 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "context" + "math" + "path/filepath" + "strconv" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" + katalystapiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/memoryadvisor" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/helper" + "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" + "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo" + tmoconf "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo" + "github.com/kubewharf/katalyst-core/pkg/consts" + katalystcoreconsts "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metrics" + "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/native" +) + +const ( + TransparentMemoryOffloading = "transparent-memory-offloading" +) + +const ( + InactiveProbe = 0.1 + OffloadingSizeScaleCoeff = 1.05 +) + +const ( + DummyTMOBlockFnName string = "dummy-tmo-block-fn" +) + +// TMO policy funcs to calculate memory offloading size +var tmoPolicyFuncs sync.Map + +// TMO block funcs to filter the containers required to disable TMO +var tmoBlockFuncs sync.Map + +type TmoStats struct { + memUsage float64 + memInactive float64 + memPsiAvg60 float64 + pgscan float64 + pgsteal float64 + refault float64 + refaultActivate float64 + offloadingTargetSize float64 +} + +type TmoPolicyFn func( + lastStats TmoStats, + currStats TmoStats, + conf *tmoconf.TMOConfigDetail) (error, float64) + +func psiPolicyFunc(lastStats TmoStats, currStats TmoStats, conf *tmoconf.TMOConfigDetail) (error, float64) { + if conf.PSIPolicyConf == nil { + return errors.New("psi policy requires psi policy configuration"), 0 + } + return nil, math.Max(0, 1-(currStats.memPsiAvg60)/(conf.PSIPolicyConf.PsiAvg60Threshold)) * conf.PSIPolicyConf.MaxProbe * currStats.memUsage +} + +func refaultPolicyFunc(lastStats TmoStats, currStats TmoStats, conf *tmoconf.TMOConfigDetail) (error, float64) { + if conf.RefaultPolicyConf == nil { + return errors.New("refault policy requires refault policy configurations"), 0 + } + pgscanDelta := currStats.pgscan - lastStats.pgscan + pgstealDelta := currStats.pgsteal - lastStats.pgsteal + refaultDelta := currStats.refaultActivate - lastStats.refaultActivate + reclaimAccuracyRatio := 1.0 + reclaimScanEfficiencyRatio := 1.0 + if pgstealDelta > 0 && pgscanDelta > 0 { + reclaimAccuracyRatio = 1 - refaultDelta/pgstealDelta + reclaimScanEfficiencyRatio = pgstealDelta / pgscanDelta + } + + var result float64 + if reclaimAccuracyRatio < conf.RefaultPolicyConf.ReclaimAccuracyTarget || reclaimScanEfficiencyRatio < conf.RefaultPolicyConf.ReclaimScanEfficiencyTarget { + // Decrease offloading size if detecting the reclaim accuracy or scan efficiency is below the targets + result = math.Max(0, lastStats.offloadingTargetSize*reclaimAccuracyRatio) + } else { + // Try to increase offloading size but make sure not exceed the max probe of memory usage and 10% of inactive memory when the target size of last round is relatively small, + // which means reclaim accuracy and reclaim scan efficiency is low. + result = math.Min(math.Max(lastStats.offloadingTargetSize*OffloadingSizeScaleCoeff, currStats.memInactive*InactiveProbe), currStats.memUsage*conf.RefaultPolicyConf.MaxProbe) + } + general.InfoS("refault info", "reclaimAccuracyRatio", reclaimAccuracyRatio, "ReclaimAccuracyTarget", conf.RefaultPolicyConf.ReclaimAccuracyTarget, + "reclaimScanEfficiencyRatio", reclaimScanEfficiencyRatio, "ReclaimScanEfficiencyTarget", conf.RefaultPolicyConf.ReclaimScanEfficiencyTarget, + "refaultDelta", refaultDelta, "pgstealDelta", pgstealDelta, "pgscanDelta", pgscanDelta, "lastOffloadingTargetSize", general.FormatMemoryQuantity(lastStats.offloadingTargetSize), + "result", general.FormatMemoryQuantity(result)) + return nil, result +} + +type TMOBlockFn func(ci *types.ContainerInfo, conf interface{}) bool + +func DummyTMOBlockFn(ci *types.ContainerInfo, conf interface{}) bool { return false } + +func init() { + RegisterTMOPolicyFunc(v1alpha1.TMOPolicyNamePSI, psiPolicyFunc) + RegisterTMOPolicyFunc(v1alpha1.TMOPolicyNameRefault, refaultPolicyFunc) + RegisterTMOBlockFunc(DummyTMOBlockFnName, DummyTMOBlockFn) +} + +func RegisterTMOPolicyFunc(policyName v1alpha1.TMOPolicyName, tmoPolicyFn TmoPolicyFn) { + tmoPolicyFuncs.Store(policyName, tmoPolicyFn) +} + +func RegisterTMOBlockFunc(blockFnName string, blockFn TMOBlockFn) { + tmoBlockFuncs.Store(blockFnName, blockFn) +} + +type transparentMemoryOffloading struct { + conf *config.Configuration + extraConf interface{} + mutex sync.RWMutex + metaReader metacache.MetaReader + metaServer *metaserver.MetaServer + emitter metrics.MetricEmitter + containerTmoEngines map[katalystcoreconsts.PodContainerName]TmoEngine + cgpathTmoEngines map[string]TmoEngine +} + +type TmoEngine interface { + GetContainerInfo() *types.ContainerInfo + GetCgpath() string + LoadConf(*tmo.TMOConfigDetail) + GetConf() *tmo.TMOConfigDetail + CalculateOffloadingTargetSize() + GetOffloadingTargetSize() float64 +} + +type tmoEngineInstance struct { + workingRounds int64 + containerInfo *types.ContainerInfo // only valid when this tmo engine is working on container + cgpath string + metaServer *metaserver.MetaServer + emitter metrics.MetricEmitter + conf *tmo.TMOConfigDetail + lastTime time.Time + lastStats TmoStats + offloadingTargetSize float64 +} + +func NewTmoEngineInstance(obj interface{}, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter, tmoConf *tmo.TransparentMemoryOffloadingConfiguration) *tmoEngineInstance { + tmoEngine := &tmoEngineInstance{ + workingRounds: 0, + metaServer: metaServer, + emitter: emitter, + conf: tmo.NewTMOConfigDetail(tmoConf.DefaultConfigurations), + } + if path, ok := obj.(string); ok { + tmoEngine.cgpath = path + } + if ci, ok := obj.(*types.ContainerInfo); ok { + tmoEngine.containerInfo = ci + } + return tmoEngine +} + +func (tmoEngine *tmoEngineInstance) GetConf() *tmo.TMOConfigDetail { + return tmoEngine.conf +} + +func (tmoEngine *tmoEngineInstance) getStats() (TmoStats, error) { + tmoStats := &TmoStats{} + var err error + getCgroupMetrics := func(metaserver *metaserver.MetaServer, absPath string) error { + relativePath, err := filepath.Rel(common.CgroupFSMountPoint, absPath) + if err != nil { + return err + } + // make sure the relative path with prefix '/' has already been added to GeneralRelativeCgroupPaths, + // otherwise the MalachiteMetricsProvisioner will not fetch and store the metrics for these cgroup paths. + relativePath = "/" + relativePath + psiAvg60, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemPsiAvg60Cgroup) + if err != nil { + return err + } + pgsteal, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemPgstealCgroup) + if err != nil { + return err + } + pgscan, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemPgscanCgroup) + if err != nil { + return err + } + refault, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemWorkingsetRefaultCgroup) + if err != nil { + return err + } + refaultActivate, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemWorkingsetActivateCgroup) + if err != nil { + return err + } + memUsage, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemUsageCgroup) + if err != nil { + return err + } + memInactiveAnon, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemInactiveAnonCgroup) + if err != nil { + return err + } + memInactiveFile, err := metaserver.GetCgroupMetric(relativePath, consts.MetricMemInactiveFileCgroup) + if err != nil { + return err + } + tmoStats.memUsage = memUsage.Value + tmoStats.memInactive = memInactiveFile.Value + memInactiveAnon.Value + tmoStats.memPsiAvg60 = psiAvg60.Value + tmoStats.pgsteal = pgsteal.Value + tmoStats.pgscan = pgscan.Value + tmoStats.refault = refault.Value + tmoStats.refaultActivate = refaultActivate.Value + tmoStats.offloadingTargetSize = tmoEngine.offloadingTargetSize + general.Infof("Memory Usage of Cgroup %s, memUsage: %v", tmoEngine.cgpath, memUsage.Value) + return nil + } + getContainerMetrics := func(metaserver *metaserver.MetaServer, podUID string, containerName string) error { + psiAvg60, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemPsiAvg60Container) + if err != nil { + return err + } + pgsteal, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemPgstealContainer) + if err != nil { + return err + } + pgscan, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemPgscanContainer) + if err != nil { + return err + } + refault, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemWorkingsetRefaultContainer) + if err != nil { + return err + } + refaultActivate, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemWorkingsetActivateContainer) + if err != nil { + return err + } + memUsage, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemUsageContainer) + if err != nil { + return err + } + memInactiveAnon, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemInactiveAnonContainer) + if err != nil { + return err + } + memInactiveFile, err := metaserver.GetContainerMetric(podUID, containerName, consts.MetricMemInactiveFileContainer) + if err != nil { + return err + } + tmoStats.memUsage = memUsage.Value + tmoStats.memInactive = memInactiveFile.Value + memInactiveAnon.Value + tmoStats.memPsiAvg60 = psiAvg60.Value + tmoStats.pgsteal = pgsteal.Value + tmoStats.pgscan = pgscan.Value + tmoStats.refault = refault.Value + tmoStats.refaultActivate = refaultActivate.Value + tmoStats.offloadingTargetSize = tmoEngine.offloadingTargetSize + general.Infof("Memory Usage of Pod %v, Container %v, memUsage: %v", podUID, containerName, memUsage.Value) + return nil + } + + if tmoEngine.containerInfo == nil { + err = getCgroupMetrics(tmoEngine.metaServer, tmoEngine.cgpath) + } else { + err = getContainerMetrics(tmoEngine.metaServer, tmoEngine.containerInfo.PodUID, tmoEngine.containerInfo.ContainerName) + } + return *tmoStats, err +} + +func (tmoEngine *tmoEngineInstance) GetOffloadingTargetSize() float64 { + return tmoEngine.offloadingTargetSize +} + +func (tmoEngine *tmoEngineInstance) GetContainerInfo() *types.ContainerInfo { + return tmoEngine.containerInfo +} + +func (tmoEngine *tmoEngineInstance) GetCgpath() string { + return tmoEngine.cgpath +} + +func (tmoEngine *tmoEngineInstance) LoadConf(detail *tmo.TMOConfigDetail) { + tmoEngine.conf.EnableTMO = detail.EnableTMO + tmoEngine.conf.EnableSwap = detail.EnableSwap + tmoEngine.conf.Interval = detail.Interval + tmoEngine.conf.PolicyName = detail.PolicyName + if psiPolicyConfDynamic := detail.PSIPolicyConf; psiPolicyConfDynamic != nil { + tmoEngine.conf.PSIPolicyConf.MaxProbe = psiPolicyConfDynamic.MaxProbe + tmoEngine.conf.PSIPolicyConf.PsiAvg60Threshold = psiPolicyConfDynamic.PsiAvg60Threshold + } + if refaultPolicyConfDynamic := detail.RefaultPolicyConf; refaultPolicyConfDynamic != nil { + tmoEngine.conf.RefaultPolicyConf.MaxProbe = refaultPolicyConfDynamic.MaxProbe + tmoEngine.conf.RefaultPolicyConf.ReclaimAccuracyTarget = refaultPolicyConfDynamic.ReclaimAccuracyTarget + tmoEngine.conf.RefaultPolicyConf.ReclaimScanEfficiencyTarget = refaultPolicyConfDynamic.ReclaimScanEfficiencyTarget + } +} + +func (tmoEngine *tmoEngineInstance) CalculateOffloadingTargetSize() { + tmoEngine.offloadingTargetSize = 0 + if !tmoEngine.conf.EnableTMO { + return + } + + currTime := time.Now() + if currTime.Sub(tmoEngine.lastTime) < tmoEngine.conf.Interval { + tmoEngine.offloadingTargetSize = 0 + return + } + + currStats, err := tmoEngine.getStats() + if err != nil { + general.Infof("Failed to get metrics %v", err) + return + } + // TODO: get result from qrm to make sure last offloading action finished + if fn, ok := tmoPolicyFuncs.Load(tmoEngine.conf.PolicyName); ok { + if policyFunc, ok := fn.(TmoPolicyFn); ok { + err, targetSize := policyFunc(tmoEngine.lastStats, currStats, tmoEngine.conf) + if err != nil { + general.ErrorS(err, "Failed to calculate offloading memory size") + return + } + tmoEngine.offloadingTargetSize = targetSize + currStats.offloadingTargetSize = targetSize + tmoEngine.lastStats = currStats + tmoEngine.lastTime = currTime + } + } + +} + +func NewTransparentMemoryOffloading(conf *config.Configuration, extraConfig interface{}, metaReader metacache.MetaReader, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) MemoryAdvisorPlugin { + return &transparentMemoryOffloading{ + conf: conf, + extraConf: extraConfig, + metaReader: metaReader, + metaServer: metaServer, + emitter: emitter, + containerTmoEngines: make(map[consts.PodContainerName]TmoEngine), + cgpathTmoEngines: make(map[string]TmoEngine), + } +} + +func (tmo *transparentMemoryOffloading) Reconcile(status *types.MemoryPressureStatus) error { + podContainerNamesMap := make(map[katalystcoreconsts.PodContainerName]bool) + podList, err := tmo.metaServer.GetPodList(context.Background(), native.PodIsActive) + if err != nil { + general.Infof("Failed to get pod list: %v", err) + } + + for _, pod := range podList { + if pod == nil { + general.Infof("Get nil pod from meta server.") + continue + } + qos, err := tmo.conf.QoSConfiguration.GetQoSLevel(pod, map[string]string{}) + if err != nil { + general.Infof("Failed to get qos level for pod uid: %s", pod.UID) + if helper.PodIsDaemonSet(pod) { + qos = katalystapiconsts.PodAnnotationQoSLevelSystemCores + general.Infof("DaemonSet pod %s is considered as system_cores qos level", pod.UID) + } + } + for _, containerStatus := range pod.Status.ContainerStatuses { + containerInfo := &types.ContainerInfo{ + PodUID: string(pod.UID), + PodName: pod.Name, + ContainerName: containerStatus.Name, + Labels: pod.Labels, + Annotations: pod.Annotations, + QoSLevel: qos, + } + podContainerName := native.GeneratePodContainerName(containerInfo.PodName, containerInfo.ContainerName) + podContainerNamesMap[podContainerName] = true + _, exist := tmo.containerTmoEngines[podContainerName] + if !exist { + tmo.containerTmoEngines[podContainerName] = NewTmoEngineInstance(containerInfo, tmo.metaServer, tmo.emitter, tmo.conf.GetDynamicConfiguration().TransparentMemoryOffloadingConfiguration) + } + // load QoSLevelConfig + if helper.IsValidQosLevel(containerInfo.QoSLevel) { + if tmoConfigDetail, exist := tmo.conf.GetDynamicConfiguration().QoSLevelConfigs[katalystapiconsts.QoSLevel(containerInfo.QoSLevel)]; exist { + tmo.containerTmoEngines[podContainerName].LoadConf(tmoConfigDetail) + general.Infof("Load QosLevel %s TMO config for podContainerName %s, enableTMO: %v, enableSwap: %v, interval: %v, policy: %v", + containerInfo.QoSLevel, podContainerName, + tmo.containerTmoEngines[podContainerName].GetConf().EnableTMO, + tmo.containerTmoEngines[podContainerName].GetConf().EnableSwap, + tmo.containerTmoEngines[podContainerName].GetConf().Interval, + tmo.containerTmoEngines[podContainerName].GetConf().PolicyName) + } + } + // load SPD conf if exists + tmoIndicator := &v1alpha1.TransparentMemoryOffloadingIndicators{} + isBaseline, err := tmo.metaServer.ServiceProfilingManager.ServiceExtendedIndicator(context.Background(), pod.ObjectMeta, tmoIndicator) + if err != nil { + general.Infof("Error occurred when load check baseline and load TransparentMemoryOffloadingIndicators, err : %v", err) + } else if !isBaseline { + tmoConfigDetail := tmo.containerTmoEngines[podContainerName].GetConf() + if tmoIndicator.ConfigDetail != nil { + tmoconf.ApplyTMOConfigDetail(tmoConfigDetail, *tmoIndicator.ConfigDetail) + general.Infof("Load Service Level TMO config for podContainerName %s, enableTMO: %v, enableSwap: %v, interval: %v, policy: %v", + podContainerName, + tmo.containerTmoEngines[podContainerName].GetConf().EnableTMO, + tmo.containerTmoEngines[podContainerName].GetConf().EnableSwap, + tmo.containerTmoEngines[podContainerName].GetConf().Interval, + tmo.containerTmoEngines[podContainerName].GetConf().PolicyName) + } + } + + // disable TMO if the Pod is numa exclusive and is not reclaimable + enableReclaim, _ := helper.PodEnableReclaim(context.Background(), tmo.metaServer, containerInfo.PodUID, true) + if containerInfo.IsNumaExclusive() && !enableReclaim { + tmo.containerTmoEngines[podContainerName].GetConf().EnableTMO = false + general.Infof("container with podContainerName: %s is required to disable TMO since it is not reclaimable", podContainerName) + } + + // disable TMO if the container is in TMO block list + funcs := make(map[string]TMOBlockFn) + tmoBlockFuncs.Range(func(key, value interface{}) bool { + funcs[key.(string)] = value.(TMOBlockFn) + return true + }) + for tmoBlockFnName, tmoBlockFn := range funcs { + if tmoBlockFn(containerInfo, tmo.extraConf) { + tmo.containerTmoEngines[podContainerName].GetConf().EnableTMO = false + general.Infof("container with podContainerName: %s is required to disable TMO by TMOBlockFn: %s", podContainerName, tmoBlockFnName) + } + } + + general.Infof("Final TMO configs for podContainerName: %v, enableTMO: %v, enableSwap: %v, interval: %v, policy: %v", podContainerName, + tmo.containerTmoEngines[podContainerName].GetConf().EnableTMO, + tmo.containerTmoEngines[podContainerName].GetConf().EnableSwap, + tmo.containerTmoEngines[podContainerName].GetConf().Interval, + tmo.containerTmoEngines[podContainerName].GetConf().PolicyName) + } + } + + // update tmo config for specified cgroup paths + for cgpath, tmoConfigDetail := range tmo.conf.GetDynamicConfiguration().TransparentMemoryOffloadingConfiguration.CgroupConfigs { + general.Infof("Load Cgroup TMO config for specific cgroup path %v", cgpath) + if _, exist := tmo.cgpathTmoEngines[cgpath]; !exist { + tmo.cgpathTmoEngines[cgpath] = NewTmoEngineInstance(cgpath, tmo.metaServer, tmo.emitter, tmo.conf.GetDynamicConfiguration().TransparentMemoryOffloadingConfiguration) + } + tmo.cgpathTmoEngines[cgpath].LoadConf(tmoConfigDetail) + general.Infof("TMO configs for cgroup: %v, enableTMO: %v, enableSwap: %v, interval: %v, policy: %v", cgpath, + tmoConfigDetail.EnableTMO, tmoConfigDetail.EnableSwap, tmoConfigDetail.Interval, tmoConfigDetail.PolicyName) + } + + // delete tmo engines for not existed containers + for podContainerName := range tmo.containerTmoEngines { + _, exist := podContainerNamesMap[podContainerName] + if !exist { + delete(tmo.containerTmoEngines, podContainerName) + } + } + + // delete tmo engines for not existed cgroups + for cgpath := range tmo.cgpathTmoEngines { + if _, exist := tmo.conf.GetDynamicConfiguration().CgroupConfigs[cgpath]; !exist { + delete(tmo.cgpathTmoEngines, cgpath) + } + } + + // calculate memory offloading size for each container + for podContainerName, tmoEngine := range tmo.containerTmoEngines { + tmoEngine.CalculateOffloadingTargetSize() + general.InfoS("Calculate target offloading size", "podContainer", podContainerName, + "result", general.FormatMemoryQuantity(tmoEngine.GetOffloadingTargetSize())) + } + + // calculate memory offloading size for each cgroups + for cgpath, tmoEngine := range tmo.cgpathTmoEngines { + tmoEngine.CalculateOffloadingTargetSize() + general.InfoS("Calculate target offloading size", "groupPath", cgpath, + "result", general.FormatMemoryQuantity(tmoEngine.GetOffloadingTargetSize())) + } + return nil +} + +func (tmo *transparentMemoryOffloading) GetAdvices() types.InternalMemoryCalculationResult { + result := types.InternalMemoryCalculationResult{ + ContainerEntries: make([]types.ContainerMemoryAdvices, 0), + ExtraEntries: make([]types.ExtraMemoryAdvices, 0), + } + tmo.mutex.RLock() + defer tmo.mutex.RUnlock() + for _, tmoEngine := range tmo.containerTmoEngines { + if tmoEngine.GetOffloadingTargetSize() <= 0 { + continue + } + enableSwap := consts.ControlKnobOFF + if tmoEngine.GetConf().EnableSwap { + enableSwap = consts.ControlKnobON + } + entry := types.ContainerMemoryAdvices{ + PodUID: tmoEngine.GetContainerInfo().PodUID, + ContainerName: tmoEngine.GetContainerInfo().ContainerName, + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): enableSwap, + string(memoryadvisor.ControlKnowKeyMemoryOffloading): strconv.FormatInt(int64(tmoEngine.GetOffloadingTargetSize()), 10)}, + } + result.ContainerEntries = append(result.ContainerEntries, entry) + } + + for cgpath, tmoEngine := range tmo.cgpathTmoEngines { + if tmoEngine.GetOffloadingTargetSize() <= 0 { + continue + } + enableSwap := consts.ControlKnobOFF + if tmoEngine.GetConf().EnableSwap { + enableSwap = consts.ControlKnobON + } + relativePath, err := filepath.Rel(common.CgroupFSMountPoint, cgpath) + if err != nil { + continue + } + relativePath = "/" + relativePath + entry := types.ExtraMemoryAdvices{ + CgroupPath: relativePath, + Values: map[string]string{ + string(memoryadvisor.ControlKnobKeySwapMax): enableSwap, + string(memoryadvisor.ControlKnowKeyMemoryOffloading): strconv.FormatInt(int64(tmoEngine.GetOffloadingTargetSize()), 10)}, + } + result.ExtraEntries = append(result.ExtraEntries, entry) + } + + return result +} diff --git a/pkg/config/agent/dynamic/crd/dynamic_crd.go b/pkg/config/agent/dynamic/crd/dynamic_crd.go index a3ab3e93a..ab93940f7 100644 --- a/pkg/config/agent/dynamic/crd/dynamic_crd.go +++ b/pkg/config/agent/dynamic/crd/dynamic_crd.go @@ -32,8 +32,9 @@ const ( // of CRD. KCC components are responsible to identify those CRs and // trigger notification. type DynamicConfigCRD struct { - AdminQoSConfiguration *v1alpha1.AdminQoSConfiguration - AuthConfiguration *v1alpha1.AuthConfiguration + AdminQoSConfiguration *v1alpha1.AdminQoSConfiguration + AuthConfiguration *v1alpha1.AuthConfiguration + TransparentMemoryOffloadingConfiguration *v1alpha1.TransparentMemoryOffloadingConfiguration } var ( @@ -41,4 +42,6 @@ var ( AdminQoSConfigurationGVR = metav1.GroupVersionResource(v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.ResourceNameAdminQoSConfigurations)) // AuthConfigurationGVR is the group version resource for AuthConfiguration AuthConfigurationGVR = metav1.GroupVersionResource(v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.ResourceNameAuthConfigurations)) + // TransparentMemoryOffloadingConfigurationGVR is the group version resource for TransparentMemoryOffloadingConfiguration + TransparentMemoryOffloadingConfigurationGVR = metav1.GroupVersionResource(v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.ResourceNameTMOConfigurations)) ) diff --git a/pkg/config/agent/dynamic/dynamic_base.go b/pkg/config/agent/dynamic/dynamic_base.go index 8bef32fcc..0bf88c640 100644 --- a/pkg/config/agent/dynamic/dynamic_base.go +++ b/pkg/config/agent/dynamic/dynamic_base.go @@ -19,6 +19,8 @@ package dynamic import ( "sync" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/tmo" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/adminqos" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/auth" "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/crd" @@ -52,16 +54,19 @@ func (c *DynamicAgentConfiguration) SetDynamicConfiguration(conf *Configuration) type Configuration struct { *adminqos.AdminQoSConfiguration *auth.AuthConfiguration + *tmo.TransparentMemoryOffloadingConfiguration } func NewConfiguration() *Configuration { return &Configuration{ - AdminQoSConfiguration: adminqos.NewAdminQoSConfiguration(), - AuthConfiguration: auth.NewAuthConfiguration(), + AdminQoSConfiguration: adminqos.NewAdminQoSConfiguration(), + AuthConfiguration: auth.NewAuthConfiguration(), + TransparentMemoryOffloadingConfiguration: tmo.NewTransparentMemoryOffloadingConfiguration(), } } func (c *Configuration) ApplyConfiguration(conf *crd.DynamicConfigCRD) { c.AdminQoSConfiguration.ApplyConfiguration(conf) c.AuthConfiguration.ApplyConfiguration(conf) + c.TransparentMemoryOffloadingConfiguration.ApplyConfiguration(conf) } diff --git a/pkg/config/agent/dynamic/tmo/tmo_base.go b/pkg/config/agent/dynamic/tmo/tmo_base.go new file mode 100644 index 000000000..2246bddc2 --- /dev/null +++ b/pkg/config/agent/dynamic/tmo/tmo_base.go @@ -0,0 +1,166 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tmo + +import ( + "time" + + "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" + "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/crd" +) + +const ( + DefaultEnableTMO bool = false + DefaultEnableSwap bool = false + DefaultTMOInterval time.Duration = 30 * time.Second + DefaultTMOPolicyName v1alpha1.TMOPolicyName = v1alpha1.TMOPolicyNamePSI + DefaultTMOMaxProbe float64 = 0.01 + DefaultTMOPSIPolicyPSIAvg60Threshold float64 = 0.1 + DefaultTMORefaultPolicyReclaimAccuracyTarget float64 = 0.99 + DefaultTMORefaultPolicyReclaimScanEfficiencyTarget float64 = 0.6 +) + +type TransparentMemoryOffloadingConfiguration struct { + DefaultConfigurations *TMODefaultConfigurations + QoSLevelConfigs map[consts.QoSLevel]*TMOConfigDetail + CgroupConfigs map[string]*TMOConfigDetail +} + +func NewTransparentMemoryOffloadingConfiguration() *TransparentMemoryOffloadingConfiguration { + return &TransparentMemoryOffloadingConfiguration{ + DefaultConfigurations: NewTMODefaultConfigurations(), + QoSLevelConfigs: map[consts.QoSLevel]*TMOConfigDetail{}, + CgroupConfigs: map[string]*TMOConfigDetail{}, + } +} + +type TMODefaultConfigurations struct { + DefaultEnableTMO bool + DefaultEnableSwap bool + DefaultTMOInterval time.Duration + DefaultTMOPolicyName v1alpha1.TMOPolicyName + DefaultTMOMaxProbe float64 + DefaultTMOPSIPolicyPSIAvg60Threshold float64 + DefaultTMORefaultPolicyReclaimAccuracyTarget float64 + DefaultTMORefaultPolicyReclaimScanEfficiencyTarget float64 +} + +func NewTMODefaultConfigurations() *TMODefaultConfigurations { + return &TMODefaultConfigurations{ + DefaultEnableTMO: DefaultEnableTMO, + DefaultEnableSwap: DefaultEnableSwap, + DefaultTMOInterval: DefaultTMOInterval, + DefaultTMOPolicyName: DefaultTMOPolicyName, + DefaultTMOMaxProbe: DefaultTMOMaxProbe, + DefaultTMOPSIPolicyPSIAvg60Threshold: DefaultTMOPSIPolicyPSIAvg60Threshold, + DefaultTMORefaultPolicyReclaimAccuracyTarget: DefaultTMORefaultPolicyReclaimAccuracyTarget, + DefaultTMORefaultPolicyReclaimScanEfficiencyTarget: DefaultTMORefaultPolicyReclaimScanEfficiencyTarget, + } +} + +type TMOConfigDetail struct { + EnableTMO bool + EnableSwap bool + Interval time.Duration + PolicyName v1alpha1.TMOPolicyName + *PSIPolicyConf + *RefaultPolicyConf +} + +func NewTMOConfigDetail(defaultConfigs *TMODefaultConfigurations) *TMOConfigDetail { + return &TMOConfigDetail{ + EnableTMO: defaultConfigs.DefaultEnableTMO, + EnableSwap: defaultConfigs.DefaultEnableSwap, + Interval: defaultConfigs.DefaultTMOInterval, + PolicyName: defaultConfigs.DefaultTMOPolicyName, + PSIPolicyConf: &PSIPolicyConf{ + MaxProbe: defaultConfigs.DefaultTMOMaxProbe, + PsiAvg60Threshold: defaultConfigs.DefaultTMOPSIPolicyPSIAvg60Threshold, + }, + RefaultPolicyConf: &RefaultPolicyConf{ + MaxProbe: defaultConfigs.DefaultTMOMaxProbe, + ReclaimAccuracyTarget: defaultConfigs.DefaultTMORefaultPolicyReclaimAccuracyTarget, + ReclaimScanEfficiencyTarget: defaultConfigs.DefaultTMORefaultPolicyReclaimScanEfficiencyTarget, + }, + } +} + +type PSIPolicyConf struct { + MaxProbe float64 + PsiAvg60Threshold float64 +} + +type RefaultPolicyConf struct { + MaxProbe float64 + ReclaimAccuracyTarget float64 + ReclaimScanEfficiencyTarget float64 +} + +func ApplyTMOConfigDetail(tmoConfigDetail *TMOConfigDetail, tmoConfigDetailDynamic v1alpha1.TMOConfigDetail) { + if tmoConfigDetailDynamic.EnableTMO != nil { + tmoConfigDetail.EnableTMO = *tmoConfigDetailDynamic.EnableTMO + } + if tmoConfigDetailDynamic.EnableSwap != nil { + tmoConfigDetail.EnableSwap = *tmoConfigDetailDynamic.EnableSwap + } + if tmoConfigDetailDynamic.Interval != nil { + tmoConfigDetail.Interval = tmoConfigDetailDynamic.Interval.Duration + } + if tmoConfigDetailDynamic.PolicyName != nil { + tmoConfigDetail.PolicyName = *tmoConfigDetailDynamic.PolicyName + } + if psiPolicyConfDynamic := tmoConfigDetailDynamic.PSIPolicyConf; psiPolicyConfDynamic != nil { + if psiPolicyConfDynamic.MaxProbe != nil { + tmoConfigDetail.PSIPolicyConf.MaxProbe = *psiPolicyConfDynamic.MaxProbe + } + if psiPolicyConfDynamic.PSIAvg60Threshold != nil { + tmoConfigDetail.PSIPolicyConf.PsiAvg60Threshold = *psiPolicyConfDynamic.PSIAvg60Threshold + } + } + if refaultPolicyConfDynamic := tmoConfigDetailDynamic.RefaultPolicConf; refaultPolicyConfDynamic != nil { + if refaultPolicyConfDynamic.MaxProbe != nil { + tmoConfigDetail.RefaultPolicyConf.MaxProbe = *refaultPolicyConfDynamic.MaxProbe + } + if refaultPolicyConfDynamic.ReclaimScanEfficiencyTarget != nil { + tmoConfigDetail.RefaultPolicyConf.ReclaimAccuracyTarget = *refaultPolicyConfDynamic.ReclaimAccuracyTarget + } + if refaultPolicyConfDynamic.ReclaimScanEfficiencyTarget != nil { + tmoConfigDetail.RefaultPolicyConf.ReclaimScanEfficiencyTarget = *refaultPolicyConfDynamic.ReclaimScanEfficiencyTarget + } + } +} + +func (c *TransparentMemoryOffloadingConfiguration) ApplyConfiguration(conf *crd.DynamicConfigCRD) { + if tmoConf := conf.TransparentMemoryOffloadingConfiguration; tmoConf != nil { + if tmoConf.Spec.Config.QoSLevelConfig != nil { + for _, qosLevelConfig := range tmoConf.Spec.Config.QoSLevelConfig { + tmoConfigDetail := NewTMOConfigDetail(c.DefaultConfigurations) + ApplyTMOConfigDetail(tmoConfigDetail, qosLevelConfig.ConfigDetail) + c.QoSLevelConfigs[qosLevelConfig.QoSLevel] = tmoConfigDetail + + } + } + if tmoConf.Spec.Config.CgroupConfig != nil { + for _, cgroupConfig := range tmoConf.Spec.Config.CgroupConfig { + tmoConfigDetail := NewTMOConfigDetail(c.DefaultConfigurations) + ApplyTMOConfigDetail(tmoConfigDetail, cgroupConfig.ConfigDetail) + c.CgroupConfigs[cgroupConfig.CgroupPath] = tmoConfigDetail + } + } + } +} diff --git a/pkg/config/agent/global/base.go b/pkg/config/agent/global/base.go index 165ec00d0..f285ac210 100644 --- a/pkg/config/agent/global/base.go +++ b/pkg/config/agent/global/base.go @@ -36,6 +36,9 @@ type BaseConfiguration struct { // specify a customized path for reclaimed-cores to enrich qos-management ways ReclaimRelativeRootCgroupPath string + // GeneralRelativeCgroupPaths are paths of standalone services which not managed by kubernetes + GeneralRelativeCgroupPaths []string + *MachineInfoConfiguration *KubeletConfiguration *RuntimeConfiguration diff --git a/pkg/consts/common.go b/pkg/consts/common.go index bcba38388..d631a828e 100644 --- a/pkg/consts/common.go +++ b/pkg/consts/common.go @@ -20,6 +20,11 @@ import ( "math" ) +const ( + ControlKnobON = "true" + ControlKnobOFF = "false" +) + const ( // OwnerReferenceIndex is the lookup name for the index function OwnerReferenceIndex = "owner-reference-index" diff --git a/pkg/consts/metric.go b/pkg/consts/metric.go index 42f5397e8..ab3fc2f02 100644 --- a/pkg/consts/metric.go +++ b/pkg/consts/metric.go @@ -216,7 +216,14 @@ const ( MetricMemPgmajfaultRateContainer = MetricMemPgmajfaultContainer + Rate MetricMemOomRateContainer = MetricMemOomContainer + Rate - MetricMemUpdateTimeContainer = "mem.updatetime.container" + MetricMemUpdateTimeContainer = "mem.updatetime.container" + MetricMemPgstealContainer = "mem.pgsteal.container" + MetricMemPgscanContainer = "mem.pgscan.container" + MetricMemWorkingsetRefaultContainer = "mem.workingsetrefault.container" + MetricMemWorkingsetActivateContainer = "mem.workingsetactivate.container" + MetricMemPsiAvg60Container = "mem.psiavg60.container" + MetricMemInactiveAnonContainer = "mem.inactiveanon.container" + MetricMemInactiveFileContainer = "mem.inactivefile.container" ) // container blkio metrics @@ -328,8 +335,15 @@ const ( MetricMemAllocstallCgroup = "mem.allocstall.cgroup" MetricMemKswapdstealCgroup = "mem.kswapdstall.cgroup" - MetricMemOomCgroup = "mem.oom.cgroup" - MetricMemScaleFactorCgroup = "mem.scalefactor.cgroup" + MetricMemOomCgroup = "mem.oom.cgroup" + MetricMemScaleFactorCgroup = "mem.scalefactor.cgroup" + MetricMemPgstealCgroup = "mem.pgsteal.cgroup" + MetricMemPgscanCgroup = "mem.pgscan.cgroup" + MetricMemWorkingsetRefaultCgroup = "mem.workingsetrefault.cgroup" + MetricMemWorkingsetActivateCgroup = "mem.workingsetactivate.cgroup" + MetricMemPsiAvg60Cgroup = "mem.psiavg60.cgroup" + MetricMemInactiveAnonCgroup = "mem.inactiveanon.cgroup" + MetricMemInactiveFileCgroup = "mem.inactivefile.cgroup" ) // Cgroup blkio metrics diff --git a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go index 4ea9e7f51..42e39e078 100644 --- a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go +++ b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go @@ -167,7 +167,11 @@ func (m *MalachiteMetricsProvisioner) updateSystemStats() error { func (m *MalachiteMetricsProvisioner) updateCgroupData() error { cgroupPaths := []string{m.baseConf.ReclaimRelativeRootCgroupPath, common.CgroupFsRootPathBurstable, common.CgroupFsRootPathBestEffort} errList := make([]error, 0) - for _, path := range cgroupPaths { + for _, path := range m.baseConf.GeneralRelativeCgroupPaths { + cgroupPaths = append(cgroupPaths, path) + } + dedupCgroupPaths := general.DedupStringSlice(cgroupPaths) + for _, path := range dedupCgroupPaths { stats, err := m.malachiteClient.GetCgroupStats(path) if err != nil { errList = append(errList, err) @@ -561,6 +565,13 @@ func (m *MalachiteMetricsProvisioner) processCgroupMemoryData(cgroupPath string, m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemOomCgroup, utilmetric.MetricData{Value: float64(mem.BpfMemStat.OomCnt), Time: &updateTime}) //m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemScaleFactorCgroup, utilmetric.MetricData{Value: general.UInt64PointerToFloat64(mem.WatermarkScaleFactor), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemPgstealCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.Pgsteal), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemPgscanCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.Pgscan), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemWorkingsetRefaultCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.WorkingsetRefault), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemWorkingsetActivateCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.WorkingsetActivate), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemPsiAvg60Cgroup, utilmetric.MetricData{Value: float64(mem.BpfMemStat.MemReclaimSettingSum), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemInactiveAnonCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.InactiveAnon), Time: &updateTime}) + m.metricStore.SetCgroupMetric(cgroupPath, consts.MetricMemInactiveFileCgroup, utilmetric.MetricData{Value: float64(mem.MemStats.InactiveFile), Time: &updateTime}) } } @@ -880,6 +891,20 @@ func (m *MalachiteMetricsProvisioner) processContainerMemoryData(podUID, contain // utilmetric.MetricData{Value: general.UInt64PointerToFloat64(mem.WatermarkScaleFactor), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemUpdateTimeContainer, utilmetric.MetricData{Value: float64(mem.UpdateTime), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemPgstealContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.Pgsteal), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemPgscanContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.Pgscan), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemWorkingsetRefaultContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.WorkingsetRefault), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemWorkingsetActivateContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.WorkingsetActivate), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemPsiAvg60Container, + utilmetric.MetricData{Value: mem.MemPressure.Some.Avg60, Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemInactiveAnonContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.InactiveAnon), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemInactiveFileContainer, + utilmetric.MetricData{Value: float64(mem.MemStats.InactiveFile), Time: &updateTime}) } } diff --git a/pkg/metaserver/agent/metric/provisioner/malachite/types/common.go b/pkg/metaserver/agent/metric/provisioner/malachite/types/common.go index 3900daade..b74c81774 100644 --- a/pkg/metaserver/agent/metric/provisioner/malachite/types/common.go +++ b/pkg/metaserver/agent/metric/provisioner/malachite/types/common.go @@ -101,6 +101,7 @@ type BpfNetData struct { type BpfMemData struct { OomCnt uint64 `json:"mem_oom_cnt"` MemReclaimCnt uint64 `json:"mem_reclaim_cnt"` + MemReclaimSettingSum uint64 `json:"mem_reclaim_setting_sum"` MemReclaimTime uint64 `json:"mem_reclaim_time"` MemCompactCnt uint64 `json:"mem_compact_cnt"` MemCompactFailCnt uint64 `json:"mem_compact_fail_cnt"` diff --git a/pkg/util/cgroup/common/types.go b/pkg/util/cgroup/common/types.go index 98e397f7d..40d6b3d29 100644 --- a/pkg/util/cgroup/common/types.go +++ b/pkg/util/cgroup/common/types.go @@ -68,6 +68,8 @@ type MemoryData struct { // cgroup memory that can never be reclaimed by kswapd. MinInBytes int64 WmarkRatio int32 + // SwapMaxInBytes < 0 means disable cgroup-level swap + SwapMaxInBytes int64 } // CPUData set cgroup cpu data diff --git a/pkg/util/cgroup/manager/cgroup.go b/pkg/util/cgroup/manager/cgroup.go index fd401f498..6b2ad3fc4 100644 --- a/pkg/util/cgroup/manager/cgroup.go +++ b/pkg/util/cgroup/manager/cgroup.go @@ -19,6 +19,8 @@ package manager import ( "context" "fmt" + "io/fs" + "math" "os/exec" "path/filepath" "time" @@ -300,3 +302,123 @@ func DropCacheWithTimeoutWithRelativePath(timeoutSecs int, absCgroupPath string, return nil } + +func SetSwapMaxWithAbsolutePathToParentCgroupRecursive(absCgroupPath string) error { + if !common.CheckCgroup2UnifiedMode() { + general.Infof("[SetSwapMaxWithAbsolutePathToParentCgroupRecursive] is not supported on cgroupv1") + return nil + } + general.Infof("[SetSwapMaxWithAbsolutePathToParentCgroupRecursive] on cgroup: %s", absCgroupPath) + swapMaxData := &common.MemoryData{SwapMaxInBytes: math.MaxInt64} + err := GetManager().ApplyMemory(absCgroupPath, swapMaxData) + if err != nil { + return err + } + + parentDir := filepath.Dir(absCgroupPath) + if parentDir != absCgroupPath && parentDir != common.GetCgroupRootPath(common.CgroupSubsysMemory) { + err = SetSwapMaxWithAbsolutePathToParentCgroupRecursive(parentDir) + if err != nil { + return err + } + } + return nil +} +func SetSwapMaxWithAbsolutePathRecursive(absCgroupPath string) error { + if !common.CheckCgroup2UnifiedMode() { + general.Infof("[SetSwapMaxWithAbsolutePathRecursive] is not supported on cgroupv1") + return nil + } + + general.Infof("[SetSwapMaxWithAbsolutePathRecursive] on cgroup: %s", absCgroupPath) + + // set swap max to parent cgroups recursively + if err := SetSwapMaxWithAbsolutePathToParentCgroupRecursive(filepath.Dir(absCgroupPath)); err != nil { + return err + } + + // set swap max to sub cgroups recursively + err := filepath.Walk(absCgroupPath, func(path string, info fs.FileInfo, err error) error { + if err != nil { + general.Infof("prevent panic by handling failure accessing a path: %s, err: %v", path, err) + return err + } + if info.IsDir() { + memStats, err := GetMemoryWithAbsolutePath(path) + if err != nil { + return filepath.SkipDir + } + var diff int64 = math.MaxInt64 + if memStats.Limit-memStats.Usage < uint64(diff) { + diff = int64(memStats.Limit - memStats.Usage) + } + swapMaxData := &common.MemoryData{SwapMaxInBytes: diff} + err = GetManager().ApplyMemory(path, swapMaxData) + if err != nil { + return filepath.SkipDir + } + } + return nil + }) + if err != nil { + general.Infof("error walking the path: %s, err: %v", absCgroupPath, err) + return err + } + return nil +} + +func DisableSwapMaxWithAbsolutePathRecursive(absCgroupPath string) error { + if !common.CheckCgroup2UnifiedMode() { + general.Infof("[DisableSwapMaxWithAbsolutePathRecursive] is not supported on cgroupv1") + return nil + } + general.Infof("[DisableSwapMaxWithAbsolutePathRecursive] on cgroup: %s", absCgroupPath) + // disable swap to sub cgroups recursively + err := filepath.Walk(absCgroupPath, func(path string, info fs.FileInfo, err error) error { + if err != nil { + general.Infof("prevent panic by handling failure accessing a path: %s, err: %v", path, err) + return err + } + if info.IsDir() { + swapMaxData := &common.MemoryData{SwapMaxInBytes: -1} + err = GetManager().ApplyMemory(path, swapMaxData) + if err != nil { + return filepath.SkipDir + } + } + return nil + }) + if err != nil { + general.Infof("error walking the path: %s, err: %v ", absCgroupPath, err) + return err + } + return nil +} +func MemoryOffloadingWithAbsolutePath(ctx context.Context, absCgroupPath string, nbytes int64) error { + startTime := time.Now() + + var cmd string + if common.CheckCgroup2UnifiedMode() { + if nbytes <= 0 { + general.Infof("[MemoryOffloadingWithAbsolutePath] skip memory reclaim on %s since nbytes is not valid", absCgroupPath) + return nil + } + //cgv2 + cmd = fmt.Sprintf("echo %d > %s", nbytes, filepath.Join(absCgroupPath, "memory.reclaim")) + } else { + //cgv1 + general.Infof("[MemoryOffloadingWithAbsolutePath] is not supported on cgroupv1") + return nil + } + + _, err := exec.Command("bash", "-c", cmd).Output() + + _ = asyncworker.EmitAsyncedMetrics(ctx, metrics.ConvertMapToTags(map[string]string{ + "absCGPath": absCgroupPath, + "succeeded": fmt.Sprintf("%v", err == nil), + })...) + delta := time.Since(startTime).Seconds() + general.Infof("[MemoryOffloadingWithAbsolutePath] it takes %v to do \"%s\" on cgroup: %s", delta, cmd, absCgroupPath) + + return err +} diff --git a/pkg/util/cgroup/manager/cgroup_test.go b/pkg/util/cgroup/manager/cgroup_test.go index 188691a8b..bcc796df2 100644 --- a/pkg/util/cgroup/manager/cgroup_test.go +++ b/pkg/util/cgroup/manager/cgroup_test.go @@ -21,8 +21,15 @@ package manager import ( "context" + "fmt" + "io/ioutil" + "math" + "os" + "path/filepath" "testing" + "bou.ke/monkey" + "github.com/opencontainers/runc/libcontainer/cgroups" "github.com/stretchr/testify/assert" "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" @@ -34,10 +41,12 @@ func TestManager(t *testing.T) { t.Parallel() _ = GetManager() + + testV1Manager(t) + testV2Manager(t) } -func TestV1Manager(t *testing.T) { - t.Parallel() +func testV1Manager(t *testing.T) { _ = v1.NewManager() @@ -45,12 +54,12 @@ func TestV1Manager(t *testing.T) { testNetCls(t, "v1") } -func TestV2Manager(t *testing.T) { - t.Parallel() +func testV2Manager(t *testing.T) { _ = v2.NewManager() testManager(t, "v2") + testSwapMax(t) } func testManager(t *testing.T, version string) { @@ -94,3 +103,69 @@ func testNetCls(t *testing.T, version string) { err = ApplyNetClsForContainer("fake-pod", "fake-container", &common.NetClsData{}) assert.Error(t, err) } + +func testSwapMax(t *testing.T) { + defer monkey.UnpatchAll() + monkey.Patch(common.CheckCgroup2UnifiedMode, func() bool { return true }) + monkey.Patch(GetManager, func() Manager { return v2.NewManager() }) + monkey.Patch(cgroups.ReadFile, func(dir, file string) (string, error) { + f := filepath.Join(dir, file) + tmp, err := ioutil.ReadFile(f) + if err != nil { + return "", err + } + return string(tmp), nil + }) + monkey.Patch(cgroups.WriteFile, func(dir, file, data string) error { + f := filepath.Join(dir, file) + return ioutil.WriteFile(f, []byte(data), 0700) + }) + + rootDir := os.TempDir() + dir := filepath.Join(rootDir, "tmp") + err := os.Mkdir(dir, 0700) + assert.NoError(t, err) + + tmpDir, err := ioutil.TempDir(dir, "fake-cgroup") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + monkey.Patch(common.GetCgroupRootPath, func(s string) string { + t.Logf("rootDir=%v", rootDir) + return rootDir + }) + + sawpFile := filepath.Join(tmpDir, "memory.swap.max") + err = ioutil.WriteFile(sawpFile, []byte{}, 0700) + assert.NoError(t, err) + + sawpFile2 := filepath.Join(dir, "memory.swap.max") + err = ioutil.WriteFile(sawpFile2, []byte{}, 0700) + assert.NoError(t, err) + + maxFile := filepath.Join(tmpDir, "memory.max") + err = ioutil.WriteFile(maxFile, []byte("12800"), 0700) + assert.NoError(t, err) + + curFile := filepath.Join(tmpDir, "memory.current") + err = ioutil.WriteFile(curFile, []byte("12600"), 0700) + assert.NoError(t, err) + + err = SetSwapMaxWithAbsolutePathRecursive(tmpDir) + assert.NoError(t, err) + + s, err := ioutil.ReadFile(sawpFile) + assert.NoError(t, err) + assert.Equal(t, fmt.Sprintf("%v", 200), string(s)) + + s, err = ioutil.ReadFile(sawpFile2) + assert.NoError(t, err) + assert.Equal(t, fmt.Sprintf("%v", math.MaxInt64), string(s)) + + err = DisableSwapMaxWithAbsolutePathRecursive(tmpDir) + assert.NoError(t, err) + + s, err = ioutil.ReadFile(sawpFile) + assert.NoError(t, err) + assert.Equal(t, fmt.Sprintf("%v", 0), string(s)) +} diff --git a/pkg/util/cgroup/manager/v1/fs_linux.go b/pkg/util/cgroup/manager/v1/fs_linux.go index c47c6bb49..36cb63c40 100644 --- a/pkg/util/cgroup/manager/v1/fs_linux.go +++ b/pkg/util/cgroup/manager/v1/fs_linux.go @@ -77,7 +77,6 @@ func (m *manager) ApplyMemory(absCgroupPath string, data *common.MemoryData) err klog.Infof("[CgroupV1] apply memory wmark successfully, cgroupPath: %s, data: %v, old data: %v\n", absCgroupPath, data.WmarkRatio, oldData) } } - return nil } diff --git a/pkg/util/cgroup/manager/v2/fs_linux.go b/pkg/util/cgroup/manager/v2/fs_linux.go index 1ebdf58bd..2d28f2376 100644 --- a/pkg/util/cgroup/manager/v2/fs_linux.go +++ b/pkg/util/cgroup/manager/v2/fs_linux.go @@ -81,6 +81,18 @@ func (m *manager) ApplyMemory(absCgroupPath string, data *common.MemoryData) err } } + if data.SwapMaxInBytes != 0 { + // Do Not change swap max setting if SwapMaxInBytes equals to 0 + var swapMax int64 = 0 + if data.SwapMaxInBytes > 0 { + swapMax = data.SwapMaxInBytes + } + if err, applied, oldData := common.WriteFileIfChange(absCgroupPath, "memory.swap.max", fmt.Sprintf("%d", swapMax)); err != nil { + return err + } else if applied { + klog.Infof("[CgroupV2] apply memory swap max successfully, cgroupPath: %s, data: %v, old data: %v\n", absCgroupPath, swapMax, oldData) + } + } return nil } diff --git a/pkg/util/general/common.go b/pkg/util/general/common.go index 0f2558332..a4e12bccf 100644 --- a/pkg/util/general/common.go +++ b/pkg/util/general/common.go @@ -362,3 +362,12 @@ func FormatMemoryQuantity(q float64) string { return fmt.Sprintf("%v[%v]", q, quantity.String()) } + +// DedupStringSlice return deduplicated string slice from original +func DedupStringSlice(input []string) []string { + result := sets.NewString() + for _, v := range input { + result.Insert(v) + } + return result.UnsortedList() +} diff --git a/pkg/util/metric/store.go b/pkg/util/metric/store.go index b23461c05..86ebae281 100644 --- a/pkg/util/metric/store.go +++ b/pkg/util/metric/store.go @@ -160,7 +160,7 @@ func (c *MetricStore) GetNodeMetric(metricName string) (MetricData, error) { if data, ok := c.nodeMetricMap[metricName]; ok { return data, nil } else { - return MetricData{}, errors.New("[MetricStore] load value failed") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] load value failed, metric=%v", metricName)) } } @@ -171,10 +171,10 @@ func (c *MetricStore) GetNumaMetric(numaID int, metricName string) (MetricData, if data, ok := c.numaMetricMap[numaID][metricName]; ok { return data, nil } else { - return MetricData{}, errors.New("[MetricStore] load value failed") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] load value failed, metric=%v, numaID=%v", metricName, numaID)) } } - return MetricData{}, errors.New("[MetricStore] empty map") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] empty map, metric=%v, numaID=%v", metricName, numaID)) } func (c *MetricStore) GetDeviceMetric(deviceName string, metricName string) (MetricData, error) { @@ -184,10 +184,10 @@ func (c *MetricStore) GetDeviceMetric(deviceName string, metricName string) (Met if data, ok := c.deviceMetricMap[deviceName][metricName]; ok { return data, nil } else { - return MetricData{}, errors.New("[MetricStore] load value failed") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] load value failed, metric=%v, deviceName=%v", metricName, deviceName)) } } - return MetricData{}, errors.New("[MetricStore] empty map") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] empty map, metric=%v, deviceName=%v", metricName, deviceName)) } func (c *MetricStore) GetNetworkMetric(networkName string, metricName string) (MetricData, error) { @@ -197,10 +197,10 @@ func (c *MetricStore) GetNetworkMetric(networkName string, metricName string) (M if data, ok := c.networkMetricMap[networkName][metricName]; ok { return data, nil } else { - return MetricData{}, errors.New("[MetricStore] load value failed") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] load value failed, metric=%v, networkName=%v", metricName, networkName)) } } - return MetricData{}, errors.New("[MetricStore] empty map") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] empty map, metric=%v, networkName=%v", metricName, networkName)) } func (c *MetricStore) GetCPUMetric(coreID int, metricName string) (MetricData, error) { @@ -210,10 +210,10 @@ func (c *MetricStore) GetCPUMetric(coreID int, metricName string) (MetricData, e if data, ok := c.cpuMetricMap[coreID][metricName]; ok { return data, nil } else { - return MetricData{}, errors.New("[MetricStore] load value failed") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] load value failed, metric=%v, coreID=%v", metricName, coreID)) } } - return MetricData{}, errors.New("[MetricStore] empty map") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] empty map, metric=%v, coreID=%v", metricName, coreID)) } func (c *MetricStore) GetContainerMetric(podUID, containerName, metricName string) (MetricData, error) { @@ -224,11 +224,11 @@ func (c *MetricStore) GetContainerMetric(podUID, containerName, metricName strin if data, ok := c.podContainerMetricMap[podUID][containerName][metricName]; ok { return data, nil } else { - return MetricData{}, errors.New("[MetricStore] load value failed") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] load value failed, metric=%v, podUID=%v, containerName=%v", metricName, podUID, containerName)) } } } - return MetricData{}, errors.New("[MetricStore] empty map") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] empty map, metric=%v, podUID=%v, containerName=%v", metricName, podUID, containerName)) } func (c *MetricStore) GetContainerNumaMetric(podUID, containerName, numaNode, metricName string) (MetricData, error) { @@ -240,12 +240,12 @@ func (c *MetricStore) GetContainerNumaMetric(podUID, containerName, numaNode, me if data, ok := c.podContainerNumaMetricMap[podUID][containerName][numaNode][metricName]; ok { return data, nil } else { - return MetricData{}, errors.New("[MetricStore] load value failed") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] load value failed, metric=%v, podUID=%v, containerName=%v, numaNode=%v", metricName, podUID, containerName, numaNode)) } } } } - return MetricData{}, errors.New("[MetricStore] empty map") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] empty map, metric=%v, podUID=%v, containerName=%v, numaNode=%v", metricName, podUID, containerName, numaNode)) } func (c *MetricStore) GetPodVolumeMetric(podUID, volumeName, metricName string) (MetricData, error) { @@ -257,11 +257,11 @@ func (c *MetricStore) GetPodVolumeMetric(podUID, volumeName, metricName string) if data, ok := c.podVolumeMetricMap[podUID][volumeName][metricName]; ok { return data, nil } else { - return MetricData{}, errors.New("[MetricStore] load value failed") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] load value failed, metric=%v, podUID=%v, volumeName=%v", metricName, podUID, volumeName)) } } } - return MetricData{}, errors.New("[MetricStore] empty map") + return MetricData{}, errors.New(fmt.Sprintf("[MetricStore] empty map, metric=%v, podUID=%v, volumeName=%v", metricName, podUID, volumeName)) } func (c *MetricStore) GCPodsMetric(livingPodUIDSet map[string]bool) {