Skip to content

Commit

Permalink
feat: agent get SPD syncly
Browse files Browse the repository at this point in the history
  • Loading branch information
ddjjia committed Apr 27, 2024
1 parent bd179d0 commit a2ba65b
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 7 deletions.
56 changes: 49 additions & 7 deletions pkg/metaserver/spd/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package spd
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -32,6 +33,7 @@ import (

configapis "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1"
workloadapis "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1"
apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/client"
pkgconfig "github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/cnc"
Expand Down Expand Up @@ -147,7 +149,7 @@ func (s *spdFetcher) Run(ctx context.Context) {
<-ctx.Done()
}

func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name string) (*workloadapis.ServiceProfileDescriptor, error) {
func (s *spdFetcher) getSPDByNamespaceName(ctx context.Context, namespace, name string) (*workloadapis.ServiceProfileDescriptor, error) {
key := native.GenerateNamespaceNameKey(namespace, name)
baseTag := []metrics.MetricTag{
{Key: "spdNamespace", Val: namespace},
Expand All @@ -157,14 +159,54 @@ func (s *spdFetcher) getSPDByNamespaceName(_ context.Context, namespace, name st
// get current spd from cache
currentSPD := s.spdCache.GetSPD(key, true)
if currentSPD != nil {
return currentSPD, nil
// TODO: temporarily to update all existing spds with desired metrics, which can be removed later.
if metricExists(currentSPD, apiconsts.SPDAggMetricNameMemoryBandwidth) {
return currentSPD, nil
}

klog.Infof("[spd-manager] need to get spd %s from remote", key)
targetConfig, err := s.getSPDTargetConfig(ctx, namespace, name)
if err != nil {
klog.Warningf("[spd-manager] get spd targetConfig config failed: %v, use local cache instead", err)
targetConfig = &configapis.TargetConfig{
ConfigNamespace: namespace,
ConfigName: name,
}
_ = s.emitter.StoreInt64(metricsNameGetCNCTargetConfigFailed, 1, metrics.MetricTypeNameCount, baseTag...)
}
err = s.updateSPDCacheIfNeed(ctx, currentSPD, targetConfig, true)
if err != nil {
klog.Errorf("[spd-manager] failed update spd cache from remote: %v, use local cache instead", err)
_ = s.emitter.StoreInt64(metricsNameUpdateCacheFailed, 1, metrics.MetricTypeNameCount, baseTag...)
return currentSPD, nil
}

currentSPD = s.spdCache.GetSPD(key, true)
if currentSPD != nil {
return currentSPD, nil
}
}

_ = s.emitter.StoreInt64(metricsNameCacheNotFound, 1, metrics.MetricTypeNameCount, baseTag...)

return nil, errors.NewNotFound(workloadapis.Resource(workloadapis.ResourceNameServiceProfileDescriptors), name)
}

// metricExists checks if metric exists in SPD AggPodMetrics
func metricExists(spd *workloadapis.ServiceProfileDescriptor, metric string) bool {
for _, aggPodMetrics := range spd.Status.AggMetrics {
for _, containerMetrics := range aggPodMetrics.Items[0].Containers {
for resourceName := range containerMetrics.Usage {
if strings.Contains(resourceName.String(), metric) {
return true
}
}
}
}

return false
}

// getSPDTargetConfig get spd target config from cnc
func (s *spdFetcher) getSPDTargetConfig(ctx context.Context, namespace, name string) (*configapis.TargetConfig, error) {
currentCNC, err := s.cncFetcher.GetCNC(ctx)
Expand Down Expand Up @@ -211,7 +253,7 @@ func (s *spdFetcher) sync(ctx context.Context) {

// try to update spd cache from remote if cache spd hash is not equal to target config hash,
// the rate of getting remote spd will be limited by spd ServiceProfileCacheTTL
err = s.updateSPDCacheIfNeed(ctx, originSPD, targetConfig)
err = s.updateSPDCacheIfNeed(ctx, originSPD, targetConfig, false)
if err != nil {
klog.Errorf("[spd-manager] failed update spd cache from remote: %v, use local cache instead", err)
_ = s.emitter.StoreInt64(metricsNameUpdateCacheFailed, 1, metrics.MetricTypeNameCount, baseTag...)
Expand All @@ -222,17 +264,17 @@ func (s *spdFetcher) sync(ctx context.Context) {
// updateSPDCacheIfNeed checks if the previous spd has changed, and
// re-get from APIServer if the previous is out-of date.
func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *workloadapis.ServiceProfileDescriptor,
targetConfig *configapis.TargetConfig) error {
targetConfig *configapis.TargetConfig, needToGetLatest bool) error {
if originSPD == nil && targetConfig == nil {
return nil
}

now := time.Now()
if originSPD == nil || util.GetSPDHash(originSPD) != targetConfig.Hash {
if originSPD == nil || util.GetSPDHash(originSPD) != targetConfig.Hash || needToGetLatest {
key := native.GenerateNamespaceNameKey(targetConfig.ConfigNamespace, targetConfig.ConfigName)
// Skip the backoff delay if the configuration hash of the CNC target changes, ensuring the
// local SPD cache is always updated with the latest configuration.
if nextFetchRemoteTime := s.spdCache.GetNextFetchRemoteTime(key, now, targetConfig.Hash != ""); nextFetchRemoteTime.After(time.Now()) {
if nextFetchRemoteTime := s.spdCache.GetNextFetchRemoteTime(key, now, targetConfig.Hash != ""); nextFetchRemoteTime.After(time.Now()) && !needToGetLatest {
return nil
} else {
// first update the timestamp of the last attempt to fetch the remote spd to
Expand All @@ -247,7 +289,7 @@ func (s *spdFetcher) updateSPDCacheIfNeed(ctx context.Context, originSPD *worklo

klog.Infof("[spd-manager] spd %s targetConfig hash is changed from %s to %s", key, util.GetSPDHash(originSPD), targetConfig.Hash)
spd, err := s.client.InternalClient.WorkloadV1alpha1().ServiceProfileDescriptors(targetConfig.ConfigNamespace).
Get(ctx, targetConfig.ConfigName, metav1.GetOptions{ResourceVersion: "0"})
Get(ctx, targetConfig.ConfigName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("get spd %s from remote failed: %v", key, err)
} else if err != nil {
Expand Down
59 changes: 59 additions & 0 deletions pkg/metaserver/spd/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (

"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"

"github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1"
workloadapis "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1"
Expand Down Expand Up @@ -226,3 +228,60 @@ func Test_spdManager_GetSPD(t *testing.T) {
})
}
}

func Test_metricExists(t *testing.T) {
t.Parallel()

tests := []struct {
name string
spd *workloadapis.ServiceProfileDescriptor
expected bool
}{
{
name: "has desired metrics",
spd: &workloadapis.ServiceProfileDescriptor{
ObjectMeta: metav1.ObjectMeta{
Name: "spd-1",
Namespace: "default",
},
Status: workloadapis.ServiceProfileDescriptorStatus{
AggMetrics: []workloadapis.AggPodMetrics{
{
Aggregator: workloadapis.Avg,
Items: []v1beta1.PodMetrics{
{
Timestamp: metav1.NewTime(time.Date(1970, 0, 0, 0, 0, 0, 0, time.UTC)),
Window: metav1.Duration{Duration: time.Hour},
Containers: []v1beta1.ContainerMetrics{
{
Name: "container-1",
Usage: map[v1.ResourceName]resource.Quantity{
consts.SPDAggMetricNameMemoryBandwidth + "-I5": resource.MustParse("1.2345"),
},
},
},
},
},
},
},
},
},
expected: true,
},
{
name: "no desired metrics",
spd: &workloadapis.ServiceProfileDescriptor{
ObjectMeta: metav1.ObjectMeta{
Name: "spd-1",
Namespace: "default",
},
},
expected: false,
},
}

for _, tt := range tests {
got := metricExists(tt.spd, consts.SPDAggMetricNameMemoryBandwidth)
require.Equal(t, tt.expected, got)
}
}

0 comments on commit a2ba65b

Please sign in to comment.