From dacb6adc34a5634a016a453401d03fbe8f4848b8 Mon Sep 17 00:00:00 2001 From: luomingmeng Date: Tue, 2 Apr 2024 14:48:50 +0800 Subject: [PATCH] reduce the frequency of spd manager try to get unused spd from remote api-server --- pkg/metaserver/spd/cache.go | 19 +++++++++++++++---- pkg/metaserver/spd/fetcher.go | 14 ++++++++++++-- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/pkg/metaserver/spd/cache.go b/pkg/metaserver/spd/cache.go index 4dab1b316..fddd7d905 100644 --- a/pkg/metaserver/spd/cache.go +++ b/pkg/metaserver/spd/cache.go @@ -103,7 +103,15 @@ func (s *Cache) GetNextFetchRemoteTime(key string) time.Time { if info.penaltyForFetchingRemoteTime > 0 { return info.lastFetchRemoteTime.Add(info.penaltyForFetchingRemoteTime) } - return info.lastFetchRemoteTime.Add(wait.Jitter(s.cacheTTL, s.jitterFactor)) + + nextFetchRemoteTime := info.lastFetchRemoteTime.Add(wait.Jitter(s.cacheTTL, s.jitterFactor)) + // If no one tries to get this spd for a long time, a penalty from lastGetTime to lastFetchRemoteTime will be added, + // which will linearly increase the period of accessing the remote, thereby reducing the frequency of accessing the api-server + if info.lastFetchRemoteTime.After(info.lastGetTime) { + nextFetchRemoteTime = nextFetchRemoteTime.Add(info.lastFetchRemoteTime.Sub(info.lastGetTime)) + } + + return nextFetchRemoteTime } return time.Time{} @@ -173,13 +181,16 @@ func (s *Cache) DeleteSPD(key string) error { } // GetSPD gets target spd by namespace/name key -func (s *Cache) GetSPD(key string) *workloadapis.ServiceProfileDescriptor { +func (s *Cache) GetSPD(key string, updateLastGetTime bool) *workloadapis.ServiceProfileDescriptor { s.Lock() defer s.Unlock() s.initSPDInfoWithoutLock(key) - // update last get spd time - s.spdInfo[key].lastGetTime = time.Now() + + if updateLastGetTime { + // update last get spd time + s.spdInfo[key].lastGetTime = time.Now() + } info, ok := s.spdInfo[key] if ok && info != nil { diff --git a/pkg/metaserver/spd/fetcher.go b/pkg/metaserver/spd/fetcher.go index 4e8b39352..9883d1771 100644 --- a/pkg/metaserver/spd/fetcher.go +++ b/pkg/metaserver/spd/fetcher.go @@ -52,6 +52,8 @@ const ( metricsNameGetCNCTargetConfigFailed = "spd_manager_get_cnc_target_failed" metricsNameUpdateCacheFailed = "spd_manager_update_cache_failed" metricsNameCacheNotFound = "spd_manager_cache_not_found" + metricsNameUpdateCacheSuccess = "spd_manager_update_cache_success" + metricsNameDeleteCache = "spd_manager_delete_cache" ) type GetPodSPDNameFunc func(pod *v1.Pod) (string, error) @@ -138,7 +140,7 @@ func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name st } // get current spd from cache - currentSPD := s.spdCache.GetSPD(key) + currentSPD := s.spdCache.GetSPD(key, true) if currentSPD != nil { return currentSPD, nil } @@ -178,7 +180,7 @@ func (s *spdFetcher) sync(ctx context.Context) { } // first get spd origin spd from local cache - originSPD := s.spdCache.GetSPD(key) + originSPD := s.spdCache.GetSPD(key, false) // get spd current target config from cnc to limit rate of get remote spd by comparing local spd // hash with cnc target config hash, if cnc target config not found it will get remote spd directly @@ -221,12 +223,18 @@ func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *worklo s.spdCache.SetLastFetchRemoteTime(key, now) } + baseTag := []metrics.MetricTag{ + {Key: "spdNamespace", Val: targetConfig.ConfigNamespace}, + {Key: "spdName", Val: targetConfig.ConfigName}, + } + klog.Infof("[spd-manager] spd %s targetConfig hash is changed from %s to %s", key, util.GetSPDHash(originSPD), targetConfig.Hash) spd, err := s.client.InternalClient.WorkloadV1alpha1().ServiceProfileDescriptors(targetConfig.ConfigNamespace). Get(ctx, targetConfig.ConfigName, metav1.GetOptions{ResourceVersion: "0"}) if err != nil && !errors.IsNotFound(err) { return fmt.Errorf("get spd %s from remote failed: %v", key, err) } else if err != nil { + _ = s.emitter.StoreInt64(metricsNameDeleteCache, 1, metrics.MetricTypeNameCount, baseTag...) err = s.spdCache.DeleteSPD(key) if err != nil { return fmt.Errorf("delete spd %s from cache failed: %v", key, err) @@ -236,6 +244,8 @@ func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *worklo return nil } + _ = s.emitter.StoreInt64(metricsNameUpdateCacheSuccess, 1, metrics.MetricTypeNameCount, baseTag...) + err = s.spdCache.SetSPD(key, spd) if err != nil { return err