From 74b648cc17ff0612a81e1a148703b4fc23d16aba Mon Sep 17 00:00:00 2001 From: "shaowei.wayne" Date: Thu, 9 May 2024 22:16:15 +0800 Subject: [PATCH] refine metric-fetcher to support specified collecting intervals --- .../app/options/metaserver/metaserver.go | 36 ++--- .../app/options/metaserver/metric.go | 102 ++++++++++++++ pkg/config/agent/metaserver/agent.go | 19 ++- pkg/metaserver/agent/metric/metric_impl.go | 127 ++++++++++++++---- pkg/metaserver/agent/metric/metric_test.go | 91 +++++++++---- .../metric/provisioner/rodan/provisioner.go | 14 +- pkg/metaserver/agent/metric/types/metric.go | 19 +-- 7 files changed, 316 insertions(+), 92 deletions(-) create mode 100644 cmd/katalyst-agent/app/options/metaserver/metric.go diff --git a/cmd/katalyst-agent/app/options/metaserver/metaserver.go b/cmd/katalyst-agent/app/options/metaserver/metaserver.go index c02499180..5ccc14545 100644 --- a/cmd/katalyst-agent/app/options/metaserver/metaserver.go +++ b/cmd/katalyst-agent/app/options/metaserver/metaserver.go @@ -43,8 +43,6 @@ const ( defaultServiceProfileCacheTTL = 1 * time.Minute ) -const defaultMetricInsurancePeriod = 0 * time.Second - const ( defaultKubeletPodCacheSyncPeriod = 30 * time.Second defaultKubeletPodCacheSyncMaxRate = 5 @@ -56,12 +54,11 @@ const defaultCustomNodeResourceCacheTTL = 15 * time.Second const defaultCustomNodeConfigCacheTTL = 15 * time.Second -const defaultRodanServerPort = 9102 - // MetaServerOptions holds all the configurations for metaserver. // we will not try to separate this structure into several individual // structures since it will not be used directly by other components; instead, // we will only separate them with blanks in a single structure. +// todo separate this option-structure into individual structures type MetaServerOptions struct { // generic configurations for metaserver CheckpointManagerDir string @@ -78,11 +75,6 @@ type MetaServerOptions struct { ServiceProfileSkipCorruptionError bool ServiceProfileCacheTTL time.Duration - // configurations for metric-fetcher - MetricInsurancePeriod time.Duration - MetricProvisions []string - RodanServerPort int - // configurations for pod-cache KubeletPodCacheSyncPeriod time.Duration KubeletPodCacheSyncMaxRate int @@ -94,6 +86,9 @@ type MetaServerOptions struct { // configurations for cnc CustomNodeConfigCacheTTL time.Duration + + // configurations for metric-fetcher + *MetricFetcherOptions } func NewMetaServerOptions() *MetaServerOptions { @@ -110,10 +105,6 @@ func NewMetaServerOptions() *MetaServerOptions { ServiceProfileSkipCorruptionError: defaultServiceProfileSkipCorruptionError, ServiceProfileCacheTTL: defaultServiceProfileCacheTTL, - MetricInsurancePeriod: defaultMetricInsurancePeriod, - MetricProvisions: []string{metaserver.MetricProvisionerMalachite, metaserver.MetricProvisionerKubelet}, - RodanServerPort: defaultRodanServerPort, - KubeletPodCacheSyncPeriod: defaultKubeletPodCacheSyncPeriod, KubeletPodCacheSyncMaxRate: defaultKubeletPodCacheSyncMaxRate, KubeletPodCacheSyncBurstBulk: defaultKubeletPodCacheSyncBurstBulk, @@ -122,6 +113,8 @@ func NewMetaServerOptions() *MetaServerOptions { CNRCacheTTL: defaultCustomNodeResourceCacheTTL, CustomNodeConfigCacheTTL: defaultCustomNodeConfigCacheTTL, + + MetricFetcherOptions: NewMetricFetcherOptions(), } } @@ -150,13 +143,6 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs.DurationVar(&o.ServiceProfileCacheTTL, "service-profile-cache-ttl", o.ServiceProfileCacheTTL, "The ttl of service profile manager cache remote spd") - fs.DurationVar(&o.MetricInsurancePeriod, "metric-insurance-period", o.MetricInsurancePeriod, - "The meta server return metric data and MetricDataExpired if the update time of metric data is earlier than this period.") - fs.StringSliceVar(&o.MetricProvisions, "metric-provisioners", o.MetricProvisions, - "The provisioners that should be enabled by default") - fs.IntVar(&o.RodanServerPort, "rodan-server-port", o.RodanServerPort, - "The rodan metric provisioner server port") - fs.DurationVar(&o.KubeletPodCacheSyncPeriod, "kubelet-pod-cache-sync-period", o.KubeletPodCacheSyncPeriod, "The period of meta server to sync pod from kubelet 10255 port") fs.IntVar(&o.KubeletPodCacheSyncMaxRate, "kubelet-pod-cache-sync-max-rate", o.KubeletPodCacheSyncMaxRate, @@ -171,6 +157,8 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs.DurationVar(&o.CustomNodeConfigCacheTTL, "custom-node-config-cache-ttl", o.CustomNodeConfigCacheTTL, "The ttl of custom node config fetcher cache remote cnc") + + o.MetricFetcherOptions.AddFlags(fss) } // ApplyTo fills up config with options @@ -187,10 +175,6 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error c.ServiceProfileSkipCorruptionError = o.ServiceProfileSkipCorruptionError c.ServiceProfileCacheTTL = o.ServiceProfileCacheTTL - c.MetricInsurancePeriod = o.MetricInsurancePeriod - c.MetricProvisions = o.MetricProvisions - c.RodanServerPort = o.RodanServerPort - c.KubeletPodCacheSyncPeriod = o.KubeletPodCacheSyncPeriod c.KubeletPodCacheSyncMaxRate = rate.Limit(o.KubeletPodCacheSyncMaxRate) c.KubeletPodCacheSyncBurstBulk = o.KubeletPodCacheSyncBurstBulk @@ -200,5 +184,9 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error c.CustomNodeConfigCacheTTL = o.CustomNodeConfigCacheTTL + if err := o.MetricFetcherOptions.ApplyTo(c.MetricConfiguration); err != nil { + return err + } + return nil } diff --git a/cmd/katalyst-agent/app/options/metaserver/metric.go b/cmd/katalyst-agent/app/options/metaserver/metric.go new file mode 100644 index 000000000..a62120908 --- /dev/null +++ b/cmd/katalyst-agent/app/options/metaserver/metric.go @@ -0,0 +1,102 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metaserver + +import ( + "time" + + cliflag "k8s.io/component-base/cli/flag" + + "github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver" +) + +const defaultMetricInsurancePeriod = 0 * time.Second + +const defaultRodanServerPort = 9102 + +type MetricFetcherOptions struct { + MetricInsurancePeriod time.Duration + MetricProvisions []string + + DefaultInterval time.Duration + ProvisionerIntervalSecs map[string]int + + *MalachiteOptions + *CgroupOptions + *KubeletOptions + *RodanOptions +} + +type ( + MalachiteOptions struct{} + CgroupOptions struct{} + KubeletOptions struct{} + RodanOptions struct { + ServerPort int + } +) + +func NewMetricFetcherOptions() *MetricFetcherOptions { + return &MetricFetcherOptions{ + MetricInsurancePeriod: defaultMetricInsurancePeriod, + MetricProvisions: []string{metaserver.MetricProvisionerMalachite, metaserver.MetricProvisionerKubelet}, + + DefaultInterval: time.Second * 5, + ProvisionerIntervalSecs: make(map[string]int), + + MalachiteOptions: &MalachiteOptions{}, + CgroupOptions: &CgroupOptions{}, + KubeletOptions: &KubeletOptions{}, + RodanOptions: &RodanOptions{ + ServerPort: defaultRodanServerPort, + }, + } +} + +// AddFlags adds flags to the specified FlagSet. +func (o *MetricFetcherOptions) AddFlags(fss *cliflag.NamedFlagSets) { + fs := fss.FlagSet("metric-server") + + fs.DurationVar(&o.MetricInsurancePeriod, "metric-insurance-period", o.MetricInsurancePeriod, + "The meta server return metric data and MetricDataExpired if the update time of metric data is earlier than this period.") + fs.StringSliceVar(&o.MetricProvisions, "metric-provisioners", o.MetricProvisions, + "The provisioners that should be enabled by default") + + fs.DurationVar(&o.DefaultInterval, "metric-interval", o.DefaultInterval, + "The default metric provisioner collecting interval") + fs.StringToIntVar(&o.ProvisionerIntervalSecs, "metric-provisioner-intervals", o.ProvisionerIntervalSecs, + "The metric provisioner collecting intervals for each individual provisioner") + + fs.IntVar(&o.RodanOptions.ServerPort, "rodan-server-port", o.RodanOptions.ServerPort, + "The rodan metric provisioner server port") +} + +// ApplyTo fills up config with options +func (o *MetricFetcherOptions) ApplyTo(c *metaserver.MetricConfiguration) error { + c.MetricInsurancePeriod = o.MetricInsurancePeriod + c.MetricProvisions = o.MetricProvisions + + c.DefaultInterval = o.DefaultInterval + c.ProvisionerIntervals = make(map[string]time.Duration) + for name, secs := range o.ProvisionerIntervalSecs { + c.ProvisionerIntervals[name] = time.Second * time.Duration(secs) + } + + c.RodanServerPort = o.RodanOptions.ServerPort + + return nil +} diff --git a/pkg/config/agent/metaserver/agent.go b/pkg/config/agent/metaserver/agent.go index 73968b2db..c1b270145 100644 --- a/pkg/config/agent/metaserver/agent.go +++ b/pkg/config/agent/metaserver/agent.go @@ -32,9 +32,22 @@ const ( type MetricConfiguration struct { MetricInsurancePeriod time.Duration MetricProvisions []string + + DefaultInterval time.Duration + ProvisionerIntervals map[string]time.Duration + + *MalachiteMetricConfiguration + *CgroupMetricConfiguration + *KubeletMetricConfiguration *RodanMetricConfiguration } +type MalachiteMetricConfiguration struct{} + +type CgroupMetricConfiguration struct{} + +type KubeletMetricConfiguration struct{} + type RodanMetricConfiguration struct { RodanServerPort int } @@ -71,7 +84,11 @@ type AgentConfiguration struct { func NewAgentConfiguration() *AgentConfiguration { return &AgentConfiguration{ MetricConfiguration: &MetricConfiguration{ - RodanMetricConfiguration: &RodanMetricConfiguration{}, + ProvisionerIntervals: make(map[string]time.Duration), + MalachiteMetricConfiguration: &MalachiteMetricConfiguration{}, + CgroupMetricConfiguration: &CgroupMetricConfiguration{}, + KubeletMetricConfiguration: &KubeletMetricConfiguration{}, + RodanMetricConfiguration: &RodanMetricConfiguration{}, }, PodConfiguration: &PodConfiguration{}, NodeConfiguration: &NodeConfiguration{}, diff --git a/pkg/metaserver/agent/metric/metric_impl.go b/pkg/metaserver/agent/metric/metric_impl.go index a5ff6b83f..dec247c90 100644 --- a/pkg/metaserver/agent/metric/metric_impl.go +++ b/pkg/metaserver/agent/metric/metric_impl.go @@ -39,19 +39,20 @@ import ( type MetricsNotifierManagerImpl struct { *syntax.RWMutex metricStore *utilmetric.MetricStore - registeredNotifier map[types.MetricsScope]map[string]types.NotifiedData + registeredNotifier map[types.MetricsScope]map[string]*types.NotifiedData } func NewMetricsNotifierManager(metricStore *utilmetric.MetricStore, emitter metrics.MetricEmitter) types.MetricsNotifierManager { return &MetricsNotifierManagerImpl{ metricStore: metricStore, RWMutex: syntax.NewRWMutex(emitter), - registeredNotifier: map[types.MetricsScope]map[string]types.NotifiedData{ - types.MetricsScopeNode: make(map[string]types.NotifiedData), - types.MetricsScopeNuma: make(map[string]types.NotifiedData), - types.MetricsScopeCPU: make(map[string]types.NotifiedData), - types.MetricsScopeDevice: make(map[string]types.NotifiedData), - types.MetricsScopeContainer: make(map[string]types.NotifiedData), + registeredNotifier: map[types.MetricsScope]map[string]*types.NotifiedData{ + types.MetricsScopeNode: make(map[string]*types.NotifiedData), + types.MetricsScopeNuma: make(map[string]*types.NotifiedData), + types.MetricsScopeCPU: make(map[string]*types.NotifiedData), + types.MetricsScopeDevice: make(map[string]*types.NotifiedData), + types.MetricsScopeContainer: make(map[string]*types.NotifiedData), + types.MetricsScopeContainerNUMA: make(map[string]*types.NotifiedData), }, } } @@ -70,7 +71,7 @@ func (m *MetricsNotifierManagerImpl) RegisterNotifier(scope types.MetricsScope, rand.Read(randBytes) key := string(randBytes) - m.registeredNotifier[scope][key] = types.NotifiedData{ + m.registeredNotifier[scope][key] = &types.NotifiedData{ Scope: scope, Req: req, Response: response, @@ -103,6 +104,13 @@ func (m *MetricsNotifierManagerImpl) notifySystem() { } else if v.Time == nil { v.Time = &now } + + if reg.LastNotify.Equal(*v.Time) { + continue + } else { + reg.LastNotify = *v.Time + } + reg.Response <- types.NotifiedResponse{ Req: reg.Req, MetricData: v, @@ -116,32 +124,53 @@ func (m *MetricsNotifierManagerImpl) notifySystem() { } else if v.Time == nil { v.Time = &now } + + if reg.LastNotify.Equal(*v.Time) { + continue + } else { + reg.LastNotify = *v.Time + } + reg.Response <- types.NotifiedResponse{ Req: reg.Req, MetricData: v, } } - for _, reg := range m.registeredNotifier[types.MetricsScopeNuma] { + for n, reg := range m.registeredNotifier[types.MetricsScopeNuma] { v, err := m.metricStore.GetNumaMetric(reg.Req.NumaID, reg.Req.MetricName) if err != nil { continue } else if v.Time == nil { v.Time = &now } + + if m.registeredNotifier[types.MetricsScopeNuma][n].LastNotify.Equal(*v.Time) { + continue + } else { + reg.LastNotify = *v.Time + } + reg.Response <- types.NotifiedResponse{ Req: reg.Req, MetricData: v, } } - for _, reg := range m.registeredNotifier[types.MetricsScopeCPU] { + for n, reg := range m.registeredNotifier[types.MetricsScopeCPU] { v, err := m.metricStore.GetCPUMetric(reg.Req.CoreID, reg.Req.MetricName) if err != nil { continue } else if v.Time == nil { v.Time = &now } + + if reg.LastNotify.Equal(*v.Time) { + continue + } else { + m.registeredNotifier[types.MetricsScopeCPU][n].LastNotify = *v.Time + } + reg.Response <- types.NotifiedResponse{ Req: reg.Req, MetricData: v, @@ -162,21 +191,37 @@ func (m *MetricsNotifierManagerImpl) notifyPods() { } else if v.Time == nil { v.Time = &now } + + if reg.LastNotify.Equal(*v.Time) { + continue + } else { + reg.LastNotify = *v.Time + } + reg.Response <- types.NotifiedResponse{ Req: reg.Req, MetricData: v, } + } - if reg.Req.NumaID == 0 { + for _, reg := range m.registeredNotifier[types.MetricsScopeContainerNUMA] { + if reg.Req.NumaNode == "" { continue } - v, err = m.metricStore.GetContainerNumaMetric(reg.Req.PodUID, reg.Req.ContainerName, fmt.Sprintf("%v", reg.Req.NumaID), reg.Req.MetricName) + v, err := m.metricStore.GetContainerNumaMetric(reg.Req.PodUID, reg.Req.ContainerName, fmt.Sprintf("%v", reg.Req.NumaNode), reg.Req.MetricName) if err != nil { continue } else if v.Time == nil { v.Time = &now } + + if reg.LastNotify.Equal(*v.Time) { + continue + } else { + reg.LastNotify = *v.Time + } + reg.Response <- types.NotifiedResponse{ Req: reg.Req, MetricData: v, @@ -220,9 +265,9 @@ type MetricsFetcherImpl struct { externalMetricManager types.ExternalMetricManager checkMetricDataExpire CheckMetricDataExpireFunc - // provisioners are ordered with priority, - // and we should always depend on the former (and fallback to latter if it missed) - provisioners []types.MetricsProvisioner + defaultInterval time.Duration + provisioners map[string]types.MetricsProvisioner + intervals map[string]time.Duration } func NewMetricsFetcher(baseConf *global.BaseConfiguration, metricConf *metaserver.MetricConfiguration, emitter metrics.MetricEmitter, podFetcher pod.PodFetcher) types.MetricsFetcher { @@ -230,11 +275,16 @@ func NewMetricsFetcher(baseConf *global.BaseConfiguration, metricConf *metaserve metricsNotifierManager := NewMetricsNotifierManager(metricStore, emitter) externalMetricManager := NewExternalMetricManager(metricStore, emitter) + intervals := make(map[string]time.Duration) + provisioners := make(map[string]types.MetricsProvisioner) registeredProvisioners := getProvisioners() - var enabledProvisioners []types.MetricsProvisioner for _, name := range metricConf.MetricProvisions { if f, ok := registeredProvisioners[name]; ok { - enabledProvisioners = append(enabledProvisioners, f(baseConf, metricConf, emitter, podFetcher, metricStore)) + intervals[name] = metricConf.DefaultInterval + if interval, exist := metricConf.ProvisionerIntervals[name]; exist { + intervals[name] = interval + } + provisioners[name] = f(baseConf, metricConf, emitter, podFetcher, metricStore) } } @@ -242,8 +292,11 @@ func NewMetricsFetcher(baseConf *global.BaseConfiguration, metricConf *metaserve metricStore: metricStore, metricsNotifierManager: metricsNotifierManager, externalMetricManager: externalMetricManager, - provisioners: enabledProvisioners, checkMetricDataExpire: checkMetricDataExpireFunc(metricConf.MetricInsurancePeriod), + + defaultInterval: metricConf.DefaultInterval, + provisioners: provisioners, + intervals: intervals, } } @@ -312,12 +365,19 @@ func (f *MetricsFetcherImpl) RegisterExternalMetric(externalMetricFunc func(stor } func (f *MetricsFetcherImpl) Run(ctx context.Context) { + // make sure all provisioners have started at least once, + // and then allow each provisioner to collect metrics with + // its specified period. + // whenever any provisioner finishes its collecting process, + // notification will be triggered, and the consumer should + // handler duplication logic if necessary. f.startOnce.Do(func() { - go wait.Until(func() { f.sample(ctx) }, time.Second*5, ctx.Done()) + f.init(ctx) + f.run(ctx) }) } -func (f *MetricsFetcherImpl) sample(ctx context.Context) { +func (f *MetricsFetcherImpl) init(ctx context.Context) { wg := sync.WaitGroup{} for name := range f.provisioners { p := f.provisioners[name] @@ -329,10 +389,7 @@ func (f *MetricsFetcherImpl) sample(ctx context.Context) { } wg.Wait() - // we should handle notifier and externalMetric only once - // rather than do it in each provisioner if f.externalMetricManager != nil { - // after sampling, we should call the registered function to get external metric f.externalMetricManager.Sample() } @@ -345,6 +402,30 @@ func (f *MetricsFetcherImpl) sample(ctx context.Context) { } } +func (f *MetricsFetcherImpl) run(ctx context.Context) { + // provisioner's implementation and its interval always exist, + // and it's ensured in init function + for name := range f.provisioners { + p := f.provisioners[name] + t := f.intervals[name] + go wait.Until(func() { + p.Run(ctx) + if f.metricsNotifierManager != nil { + f.metricsNotifierManager.Notify() + } + }, t, ctx.Done()) + } + + if f.externalMetricManager != nil { + go wait.Until(func() { + f.externalMetricManager.Sample() + if f.metricsNotifierManager != nil { + f.metricsNotifierManager.Notify() + } + }, f.defaultInterval, ctx.Done()) + } +} + func (f *MetricsFetcherImpl) HasSynced() bool { return f.hasSynced } diff --git a/pkg/metaserver/agent/metric/metric_test.go b/pkg/metaserver/agent/metric/metric_test.go index e48ae741d..1183e1449 100644 --- a/pkg/metaserver/agent/metric/metric_test.go +++ b/pkg/metaserver/agent/metric/metric_test.go @@ -44,9 +44,9 @@ func generateTestConfiguration(t *testing.T) *config.Configuration { func Test_notifySystem(t *testing.T) { t.Parallel() - now := time.Now() - + totalNotification := 0 conf := generateTestConfiguration(t) + conf.DefaultInterval = time.Millisecond * 300 f := NewMetricsFetcher(conf.BaseConfiguration, conf.MetricConfiguration, metrics.DummyMetrics{}, &pod.PodFetcherStub{}) rChan := make(chan metrictypes.NotifiedResponse, 20) @@ -70,14 +70,15 @@ func Test_notifySystem(t *testing.T) { PodUID: "test-pod", ContainerName: "test-container", }, rChan) - f.RegisterNotifier(metrictypes.MetricsScopeContainer, metrictypes.NotifiedRequest{ + f.RegisterNotifier(metrictypes.MetricsScopeContainerNUMA, metrictypes.NotifiedRequest{ MetricName: "test-container-numa-metric", PodUID: "test-pod", ContainerName: "test-container", NumaNode: "3", }, rChan) - m := f.(*MetricsFetcherImpl) + + now := time.Now() m.metricStore.SetNodeMetric("test-node-metric", metric.MetricData{Value: 34, Time: &now}) m.metricStore.SetNumaMetric(1, "test-numa-metric", metric.MetricData{Value: 56, Time: &now}) m.metricStore.SetCPUMetric(2, "test-cpu-metric", metric.MetricData{Value: 78, Time: &now}) @@ -85,29 +86,69 @@ func Test_notifySystem(t *testing.T) { m.metricStore.SetContainerMetric("test-pod", "test-container", "test-container-metric", metric.MetricData{Value: 91, Time: &now}) m.metricStore.SetContainerNumaMetric("test-pod", "test-container", "3", "test-container-numa-metric", metric.MetricData{Value: 75, Time: &now}) - go func() { - for { - select { - case response := <-rChan: - switch response.Req.MetricName { - case "test-node-metric": - assert.Equal(t, response.Value, 34) - case "test-numa-metric": - assert.Equal(t, response.Value, 56) - case "test-cpu-metric": - assert.Equal(t, response.Value, 78) - case "test-device-metric": - assert.Equal(t, response.Value, 91) - case "test-container-metric": - assert.Equal(t, response.Value, 91) - case "test-container-numa-metric": - assert.Equal(t, response.Value, 75) - } + // force trigger multiple notifications in a row, + // and expect only one response for a single data + m.metricsNotifierManager.Notify() + m.metricsNotifierManager.Notify() + + for { + timeout := false + select { + case response := <-rChan: + totalNotification++ + t.Log(response.Req.MetricName) + switch response.Req.MetricName { + case "test-node-metric": + assert.Equal(t, float64(34), response.Value) + case "test-numa-metric": + assert.Equal(t, float64(56), response.Value) + case "test-cpu-metric": + assert.Equal(t, float64(78), response.Value) + case "test-device-metric": + assert.Equal(t, float64(91), response.Value) + case "test-container-metric": + assert.Equal(t, float64(91), response.Value) + case "test-container-numa-metric": + assert.Equal(t, float64(75), response.Value) } + case <-time.After(time.Millisecond * 300): + timeout = true } - }() - - time.Sleep(time.Millisecond * 3) + if timeout { + break + } + } + assert.Equal(t, 6, totalNotification) + + cur := time.Now() + m.metricStore.SetNodeMetric("test-node-metric", metric.MetricData{Value: 12, Time: &cur}) + m.metricStore.SetContainerNumaMetric("test-pod", "test-container", "3", "test-container-numa-metric", metric.MetricData{Value: 22, Time: &cur}) + + // force trigger multiple notifications again, + // and expect to get awareness only for changed-metrics + m.metricsNotifierManager.Notify() + m.metricsNotifierManager.Notify() + + for { + timeout := false + select { + case response := <-rChan: + totalNotification++ + t.Log(response.Req.MetricName) + switch response.Req.MetricName { + case "test-node-metric": + assert.Equal(t, float64(12), response.Value) + case "test-container-numa-metric": + assert.Equal(t, float64(22), response.Value) + } + case <-time.After(time.Millisecond * 300): + timeout = true + } + if timeout { + break + } + } + assert.Equal(t, 8, totalNotification) } func TestStore_Aggregate(t *testing.T) { diff --git a/pkg/metaserver/agent/metric/provisioner/rodan/provisioner.go b/pkg/metaserver/agent/metric/provisioner/rodan/provisioner.go index 88e8c13ae..af8904acb 100644 --- a/pkg/metaserver/agent/metric/provisioner/rodan/provisioner.go +++ b/pkg/metaserver/agent/metric/provisioner/rodan/provisioner.go @@ -42,13 +42,10 @@ const ( ) type RodanMetricsProvisioner struct { - client *client.RodanClient - + client *client.RodanClient metricStore *utilmetric.MetricStore - - podFetcher pod.PodFetcher - - emitter metrics.MetricEmitter + podFetcher pod.PodFetcher + emitter metrics.MetricEmitter synced bool } @@ -78,15 +75,10 @@ func (i *RodanMetricsProvisioner) Run(ctx context.Context) { func (i *RodanMetricsProvisioner) sample(ctx context.Context) { i.updateNodeStats() - i.updateNUMAStats() - i.updateNodeCgroupStats() - i.updateNodeSysctlStats() - i.updateCoreStats() - i.updatePodStats(ctx) i.synced = true diff --git a/pkg/metaserver/agent/metric/types/metric.go b/pkg/metaserver/agent/metric/types/metric.go index 53e304004..aa5e4ec8a 100644 --- a/pkg/metaserver/agent/metric/types/metric.go +++ b/pkg/metaserver/agent/metric/types/metric.go @@ -18,6 +18,7 @@ package types import ( "context" + "time" v1 "k8s.io/api/core/v1" @@ -28,11 +29,12 @@ import ( type MetricsScope string const ( - MetricsScopeNode MetricsScope = "node" - MetricsScopeNuma MetricsScope = "numa" - MetricsScopeCPU MetricsScope = "cpu" - MetricsScopeDevice MetricsScope = "device" - MetricsScopeContainer MetricsScope = "container" + MetricsScopeNode MetricsScope = "node" + MetricsScopeNuma MetricsScope = "numa" + MetricsScopeCPU MetricsScope = "cpu" + MetricsScopeDevice MetricsScope = "device" + MetricsScopeContainer MetricsScope = "container" + MetricsScopeContainerNUMA MetricsScope = "container-numa" ) // NotifiedRequest defines the structure as requests for notifier @@ -50,9 +52,10 @@ type NotifiedRequest struct { // NotifiedData defines the structure as response data for notifier type NotifiedData struct { - Scope MetricsScope - Req NotifiedRequest - Response chan NotifiedResponse + Scope MetricsScope + Req NotifiedRequest + Response chan NotifiedResponse + LastNotify time.Time } type NotifiedResponse struct {