Skip to content

Commit

Permalink
reduce the frequency of spd manager try to get unused spd from remote…
Browse files Browse the repository at this point in the history
… api-server
  • Loading branch information
luomingmeng authored and waynepeking348 committed Apr 7, 2024
1 parent 49b1c04 commit dacb6ad
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
19 changes: 15 additions & 4 deletions pkg/metaserver/spd/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,15 @@ func (s *Cache) GetNextFetchRemoteTime(key string) time.Time {
if info.penaltyForFetchingRemoteTime > 0 {
return info.lastFetchRemoteTime.Add(info.penaltyForFetchingRemoteTime)
}
return info.lastFetchRemoteTime.Add(wait.Jitter(s.cacheTTL, s.jitterFactor))

nextFetchRemoteTime := info.lastFetchRemoteTime.Add(wait.Jitter(s.cacheTTL, s.jitterFactor))
// If no one tries to get this spd for a long time, a penalty from lastGetTime to lastFetchRemoteTime will be added,
// which will linearly increase the period of accessing the remote, thereby reducing the frequency of accessing the api-server
if info.lastFetchRemoteTime.After(info.lastGetTime) {
nextFetchRemoteTime = nextFetchRemoteTime.Add(info.lastFetchRemoteTime.Sub(info.lastGetTime))
}

return nextFetchRemoteTime
}

return time.Time{}
Expand Down Expand Up @@ -173,13 +181,16 @@ func (s *Cache) DeleteSPD(key string) error {
}

// GetSPD gets target spd by namespace/name key
func (s *Cache) GetSPD(key string) *workloadapis.ServiceProfileDescriptor {
func (s *Cache) GetSPD(key string, updateLastGetTime bool) *workloadapis.ServiceProfileDescriptor {
s.Lock()
defer s.Unlock()

s.initSPDInfoWithoutLock(key)
// update last get spd time
s.spdInfo[key].lastGetTime = time.Now()

if updateLastGetTime {
// update last get spd time
s.spdInfo[key].lastGetTime = time.Now()
}

info, ok := s.spdInfo[key]
if ok && info != nil {
Expand Down
14 changes: 12 additions & 2 deletions pkg/metaserver/spd/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
metricsNameGetCNCTargetConfigFailed = "spd_manager_get_cnc_target_failed"
metricsNameUpdateCacheFailed = "spd_manager_update_cache_failed"
metricsNameCacheNotFound = "spd_manager_cache_not_found"
metricsNameUpdateCacheSuccess = "spd_manager_update_cache_success"
metricsNameDeleteCache = "spd_manager_delete_cache"
)

type GetPodSPDNameFunc func(pod *v1.Pod) (string, error)
Expand Down Expand Up @@ -138,7 +140,7 @@ func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name st
}

// get current spd from cache
currentSPD := s.spdCache.GetSPD(key)
currentSPD := s.spdCache.GetSPD(key, true)
if currentSPD != nil {
return currentSPD, nil
}
Expand Down Expand Up @@ -178,7 +180,7 @@ func (s *spdFetcher) sync(ctx context.Context) {
}

// first get spd origin spd from local cache
originSPD := s.spdCache.GetSPD(key)
originSPD := s.spdCache.GetSPD(key, false)

// get spd current target config from cnc to limit rate of get remote spd by comparing local spd
// hash with cnc target config hash, if cnc target config not found it will get remote spd directly
Expand Down Expand Up @@ -221,12 +223,18 @@ func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *worklo
s.spdCache.SetLastFetchRemoteTime(key, now)
}

baseTag := []metrics.MetricTag{
{Key: "spdNamespace", Val: targetConfig.ConfigNamespace},
{Key: "spdName", Val: targetConfig.ConfigName},
}

klog.Infof("[spd-manager] spd %s targetConfig hash is changed from %s to %s", key, util.GetSPDHash(originSPD), targetConfig.Hash)
spd, err := s.client.InternalClient.WorkloadV1alpha1().ServiceProfileDescriptors(targetConfig.ConfigNamespace).
Get(ctx, targetConfig.ConfigName, metav1.GetOptions{ResourceVersion: "0"})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("get spd %s from remote failed: %v", key, err)
} else if err != nil {
_ = s.emitter.StoreInt64(metricsNameDeleteCache, 1, metrics.MetricTypeNameCount, baseTag...)
err = s.spdCache.DeleteSPD(key)
if err != nil {
return fmt.Errorf("delete spd %s from cache failed: %v", key, err)
Expand All @@ -236,6 +244,8 @@ func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *worklo
return nil
}

_ = s.emitter.StoreInt64(metricsNameUpdateCacheSuccess, 1, metrics.MetricTypeNameCount, baseTag...)

err = s.spdCache.SetSPD(key, spd)
if err != nil {
return err
Expand Down

0 comments on commit dacb6ad

Please sign in to comment.