diff --git a/cmd/katalyst-agent/app/options/metaserver/metaserver.go b/cmd/katalyst-agent/app/options/metaserver/metaserver.go index 5ccc14545..d53c92584 100644 --- a/cmd/katalyst-agent/app/options/metaserver/metaserver.go +++ b/cmd/katalyst-agent/app/options/metaserver/metaserver.go @@ -41,6 +41,7 @@ const ( const ( defaultServiceProfileSkipCorruptionError = true defaultServiceProfileCacheTTL = 1 * time.Minute + defaultGetFromRemote = false ) const ( @@ -74,6 +75,7 @@ type MetaServerOptions struct { // configurations for spd ServiceProfileSkipCorruptionError bool ServiceProfileCacheTTL time.Duration + GetFromRemote bool // configurations for pod-cache KubeletPodCacheSyncPeriod time.Duration @@ -104,6 +106,7 @@ func NewMetaServerOptions() *MetaServerOptions { ServiceProfileSkipCorruptionError: defaultServiceProfileSkipCorruptionError, ServiceProfileCacheTTL: defaultServiceProfileCacheTTL, + GetFromRemote: defaultGetFromRemote, KubeletPodCacheSyncPeriod: defaultKubeletPodCacheSyncPeriod, KubeletPodCacheSyncMaxRate: defaultKubeletPodCacheSyncMaxRate, @@ -142,6 +145,7 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) { "Whether to skip corruption error when loading spd checkpoint") fs.DurationVar(&o.ServiceProfileCacheTTL, "service-profile-cache-ttl", o.ServiceProfileCacheTTL, "The ttl of service profile manager cache remote spd") + fs.BoolVar(&o.GetFromRemote, "get-from-remote", o.GetFromRemote, "get spd from remote if not in cache") fs.DurationVar(&o.KubeletPodCacheSyncPeriod, "kubelet-pod-cache-sync-period", o.KubeletPodCacheSyncPeriod, "The period of meta server to sync pod from kubelet 10255 port") @@ -174,6 +178,7 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error c.ServiceProfileSkipCorruptionError = o.ServiceProfileSkipCorruptionError c.ServiceProfileCacheTTL = o.ServiceProfileCacheTTL + c.GetFromRemote = o.GetFromRemote c.KubeletPodCacheSyncPeriod = o.KubeletPodCacheSyncPeriod c.KubeletPodCacheSyncMaxRate = rate.Limit(o.KubeletPodCacheSyncMaxRate) diff --git a/pkg/config/agent/metaserver/spd.go b/pkg/config/agent/metaserver/spd.go index 45a4f99c3..1f89fcb16 100644 --- a/pkg/config/agent/metaserver/spd.go +++ b/pkg/config/agent/metaserver/spd.go @@ -21,6 +21,7 @@ import "time" type SPDConfiguration struct { ServiceProfileSkipCorruptionError bool ServiceProfileCacheTTL time.Duration + GetFromRemote bool } func NewSPDConfiguration() *SPDConfiguration { diff --git a/pkg/metaserver/spd/fetcher.go b/pkg/metaserver/spd/fetcher.go index 4dffe383f..f892f9344 100644 --- a/pkg/metaserver/spd/fetcher.go +++ b/pkg/metaserver/spd/fetcher.go @@ -78,8 +78,9 @@ func (d DummySPDFetcher) Run(_ context.Context) { } type spdFetcher struct { - started *atomic.Bool - mux sync.Mutex + started *atomic.Bool + getFromRemote bool + mux sync.Mutex client *client.GenericClientSet emitter metrics.MetricEmitter @@ -106,6 +107,7 @@ func NewSPDFetcher(clientSet *client.GenericClientSet, emitter metrics.MetricEmi emitter: emitter, checkpointManager: checkpointManager, cncFetcher: cncFetcher, + getFromRemote: conf.GetFromRemote, } m.getPodSPDNameFunc = util.GetPodSPDName @@ -148,7 +150,7 @@ func (s *spdFetcher) Run(ctx context.Context) { <-ctx.Done() } -func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name string) (*workloadapis.ServiceProfileDescriptor, error) { +func (s *spdFetcher) getSPDByNamespaceName(ctx context.Context, namespace, name string) (*workloadapis.ServiceProfileDescriptor, error) { key := native.GenerateNamespaceNameKey(namespace, name) baseTag := []metrics.MetricTag{ {Key: "spdNamespace", Val: namespace}, @@ -159,6 +161,28 @@ func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name st currentSPD := s.spdCache.GetSPD(key, true) if currentSPD != nil { return currentSPD, nil + } else if s.getFromRemote { + klog.Infof("[spd-manager] need to get spd %s from remote", key) + targetConfig, err := s.getSPDTargetConfig(ctx, namespace, name) + if err != nil { + klog.Warningf("[spd-manager] get spd targetConfig config failed: %v, use local cache instead", err) + targetConfig = &configapis.TargetConfig{ + ConfigNamespace: namespace, + ConfigName: name, + } + _ = s.emitter.StoreInt64(metricsNameGetCNCTargetConfigFailed, 1, metrics.MetricTypeNameCount, baseTag...) + } + err = s.updateSPDCacheIfNeed(ctx, currentSPD, targetConfig, s.getFromRemote) + if err != nil { + klog.Errorf("[spd-manager] failed update spd cache from remote: %v, use local cache instead", err) + _ = s.emitter.StoreInt64(metricsNameUpdateCacheFailed, 1, metrics.MetricTypeNameCount, baseTag...) + return currentSPD, nil + } + + currentSPD = s.spdCache.GetSPD(key, true) + if currentSPD != nil { + return currentSPD, nil + } } _ = s.emitter.StoreInt64(metricsNameCacheNotFound, 1, metrics.MetricTypeNameCount, baseTag...) @@ -212,7 +236,7 @@ func (s *spdFetcher) sync(ctx context.Context) { // try to update spd cache from remote if cache spd hash is not equal to target config hash, // the rate of getting remote spd will be limited by spd ServiceProfileCacheTTL - err = s.updateSPDCacheIfNeed(ctx, originSPD, targetConfig) + err = s.updateSPDCacheIfNeed(ctx, originSPD, targetConfig, false) if err != nil { klog.Errorf("[spd-manager] failed update spd cache from remote: %v, use local cache instead", err) _ = s.emitter.StoreInt64(metricsNameUpdateCacheFailed, 1, metrics.MetricTypeNameCount, baseTag...) @@ -223,18 +247,18 @@ func (s *spdFetcher) sync(ctx context.Context) { // updateSPDCacheIfNeed checks if the previous spd has changed, and // re-get from APIServer if the previous is out-of date. func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *workloadapis.ServiceProfileDescriptor, - targetConfig *configapis.TargetConfig, + targetConfig *configapis.TargetConfig, needToGetFromRemote bool, ) error { if originSPD == nil && targetConfig == nil { return nil } now := time.Now() - if originSPD == nil || util.GetSPDHash(originSPD) != targetConfig.Hash { + if originSPD == nil || util.GetSPDHash(originSPD) != targetConfig.Hash || needToGetFromRemote { key := native.GenerateNamespaceNameKey(targetConfig.ConfigNamespace, targetConfig.ConfigName) // Skip the backoff delay if the configuration hash of the CNC target changes, ensuring the // local SPD cache is always updated with the latest configuration. - if nextFetchRemoteTime := s.spdCache.GetNextFetchRemoteTime(key, now, targetConfig.Hash != ""); nextFetchRemoteTime.After(time.Now()) { + if nextFetchRemoteTime := s.spdCache.GetNextFetchRemoteTime(key, now, targetConfig.Hash != ""); nextFetchRemoteTime.After(time.Now()) && !needToGetFromRemote { return nil } else { // first update the timestamp of the last attempt to fetch the remote spd to diff --git a/pkg/metaserver/spd/fetcher_test.go b/pkg/metaserver/spd/fetcher_test.go index 6aafeb722..d9b38a2d0 100644 --- a/pkg/metaserver/spd/fetcher_test.go +++ b/pkg/metaserver/spd/fetcher_test.go @@ -46,6 +46,7 @@ func generateTestConfiguration(t *testing.T, nodeName string, checkpoint string) testConfiguration.NodeName = nodeName testConfiguration.CheckpointManagerDir = checkpoint + testConfiguration.GetFromRemote = true return testConfiguration }