diff --git a/pkg/metaserver/spd/fetcher.go b/pkg/metaserver/spd/fetcher.go index 67d7d3aef0..e20e94a7dd 100644 --- a/pkg/metaserver/spd/fetcher.go +++ b/pkg/metaserver/spd/fetcher.go @@ -19,6 +19,7 @@ package spd import ( "context" "fmt" + "strings" "sync" "time" @@ -32,6 +33,7 @@ import ( configapis "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" workloadapis "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/client" pkgconfig "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/cnc" @@ -147,7 +149,7 @@ func (s *spdFetcher) Run(ctx context.Context) { <-ctx.Done() } -func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name string) (*workloadapis.ServiceProfileDescriptor, error) { +func (s *spdFetcher) getSPDByNamespaceName(ctx context.Context, namespace, name string) (*workloadapis.ServiceProfileDescriptor, error) { key := native.GenerateNamespaceNameKey(namespace, name) baseTag := []metrics.MetricTag{ {Key: "spdNamespace", Val: namespace}, @@ -157,7 +159,32 @@ func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name st // get current spd from cache currentSPD := s.spdCache.GetSPD(key, true) if currentSPD != nil { - return currentSPD, nil + // TODO: temporarily to update all existing spds with desired metrics, which can be removed later. + if metricExists(currentSPD, apiconsts.SPDAggMetricNameMemoryBandwidth) { + return currentSPD, nil + } + + klog.Infof("[spd-manager] need to get spd %s from remote", key) + targetConfig, err := s.getSPDTargetConfig(ctx, namespace, name) + if err != nil { + klog.Warningf("[spd-manager] get spd targetConfig config failed: %v, use local cache instead", err) + targetConfig = &configapis.TargetConfig{ + ConfigNamespace: namespace, + ConfigName: name, + } + _ = s.emitter.StoreInt64(metricsNameGetCNCTargetConfigFailed, 1, metrics.MetricTypeNameCount, baseTag...) + } + err = s.updateSPDCacheIfNeed(ctx, currentSPD, targetConfig, true) + if err != nil { + klog.Errorf("[spd-manager] failed update spd cache from remote: %v, use local cache instead", err) + _ = s.emitter.StoreInt64(metricsNameUpdateCacheFailed, 1, metrics.MetricTypeNameCount, baseTag...) + return currentSPD, nil + } + + currentSPD = s.spdCache.GetSPD(key, true) + if currentSPD != nil { + return currentSPD, nil + } } _ = s.emitter.StoreInt64(metricsNameCacheNotFound, 1, metrics.MetricTypeNameCount, baseTag...) @@ -165,6 +192,21 @@ func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name st return nil, errors.NewNotFound(workloadapis.Resource(workloadapis.ResourceNameServiceProfileDescriptors), name) } +// metricExists checks if metric exists in SPD AggPodMetrics +func metricExists(spd *workloadapis.ServiceProfileDescriptor, metric string) bool { + for _, aggPodMetrics := range spd.Status.AggMetrics { + for _, containerMetrics := range aggPodMetrics.Items[0].Containers { + for resourceName := range containerMetrics.Usage { + if strings.Contains(resourceName.String(), metric) { + return true + } + } + } + } + + return false +} + // getSPDTargetConfig get spd target config from cnc func (s *spdFetcher) getSPDTargetConfig(ctx context.Context, namespace, name string) (*configapis.TargetConfig, error) { currentCNC, err := s.cncFetcher.GetCNC(ctx) @@ -211,7 +253,7 @@ func (s *spdFetcher) sync(ctx context.Context) { // try to update spd cache from remote if cache spd hash is not equal to target config hash, // the rate of getting remote spd will be limited by spd ServiceProfileCacheTTL - err = s.updateSPDCacheIfNeed(ctx, originSPD, targetConfig) + err = s.updateSPDCacheIfNeed(ctx, originSPD, targetConfig, false) if err != nil { klog.Errorf("[spd-manager] failed update spd cache from remote: %v, use local cache instead", err) _ = s.emitter.StoreInt64(metricsNameUpdateCacheFailed, 1, metrics.MetricTypeNameCount, baseTag...) @@ -222,17 +264,17 @@ func (s *spdFetcher) sync(ctx context.Context) { // updateSPDCacheIfNeed checks if the previous spd has changed, and // re-get from APIServer if the previous is out-of date. func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *workloadapis.ServiceProfileDescriptor, - targetConfig *configapis.TargetConfig) error { + targetConfig *configapis.TargetConfig, needToGetLatest bool) error { if originSPD == nil && targetConfig == nil { return nil } now := time.Now() - if originSPD == nil || util.GetSPDHash(originSPD) != targetConfig.Hash { + if originSPD == nil || util.GetSPDHash(originSPD) != targetConfig.Hash || needToGetLatest { key := native.GenerateNamespaceNameKey(targetConfig.ConfigNamespace, targetConfig.ConfigName) // Skip the backoff delay if the configuration hash of the CNC target changes, ensuring the // local SPD cache is always updated with the latest configuration. - if nextFetchRemoteTime := s.spdCache.GetNextFetchRemoteTime(key, now, targetConfig.Hash != ""); nextFetchRemoteTime.After(time.Now()) { + if nextFetchRemoteTime := s.spdCache.GetNextFetchRemoteTime(key, now, targetConfig.Hash != ""); nextFetchRemoteTime.After(time.Now()) && !needToGetLatest { return nil } else { // first update the timestamp of the last attempt to fetch the remote spd to @@ -247,7 +289,7 @@ func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *worklo klog.Infof("[spd-manager] spd %s targetConfig hash is changed from %s to %s", key, util.GetSPDHash(originSPD), targetConfig.Hash) spd, err := s.client.InternalClient.WorkloadV1alpha1().ServiceProfileDescriptors(targetConfig.ConfigNamespace). - Get(ctx, targetConfig.ConfigName, metav1.GetOptions{ResourceVersion: "0"}) + Get(ctx, targetConfig.ConfigName, metav1.GetOptions{}) if err != nil && !errors.IsNotFound(err) { return fmt.Errorf("get spd %s from remote failed: %v", key, err) } else if err != nil { diff --git a/pkg/metaserver/spd/fetcher_test.go b/pkg/metaserver/spd/fetcher_test.go index 6aafeb722b..3cb233d7d1 100644 --- a/pkg/metaserver/spd/fetcher_test.go +++ b/pkg/metaserver/spd/fetcher_test.go @@ -25,8 +25,10 @@ import ( "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/metrics/pkg/apis/metrics/v1beta1" "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1" workloadapis "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1" @@ -226,3 +228,60 @@ func Test_spdManager_GetSPD(t *testing.T) { }) } } + +func Test_metricExists(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + spd *workloadapis.ServiceProfileDescriptor + expected bool + }{ + { + name: "has desired metrics", + spd: &workloadapis.ServiceProfileDescriptor{ + ObjectMeta: metav1.ObjectMeta{ + Name: "spd-1", + Namespace: "default", + }, + Status: workloadapis.ServiceProfileDescriptorStatus{ + AggMetrics: []workloadapis.AggPodMetrics{ + { + Aggregator: workloadapis.Avg, + Items: []v1beta1.PodMetrics{ + { + Timestamp: metav1.NewTime(time.Date(1970, 0, 0, 0, 0, 0, 0, time.UTC)), + Window: metav1.Duration{Duration: time.Hour}, + Containers: []v1beta1.ContainerMetrics{ + { + Name: "container-1", + Usage: map[v1.ResourceName]resource.Quantity{ + consts.SPDAggMetricNameMemoryBandwidth + "-I5": resource.MustParse("1.2345"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: true, + }, + { + name: "no desired metrics", + spd: &workloadapis.ServiceProfileDescriptor{ + ObjectMeta: metav1.ObjectMeta{ + Name: "spd-1", + Namespace: "default", + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + got := metricExists(tt.spd, consts.SPDAggMetricNameMemoryBandwidth) + require.Equal(t, tt.expected, got) + } +}