Skip to content

Commit

Permalink
feat(kcmas): support grouped aggregate metric (#530)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzhhb committed Apr 7, 2024
1 parent b122f7b commit 49b1c04
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 129 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/google/cadvisor v0.44.2
github.com/google/uuid v1.3.0
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.4.1-0.20240318070445-339cc970486a
github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
github.com/opencontainers/selinux v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.4.1-0.20240318070445-339cc970486a h1:xdoHeYI+CrPj1d/kOAAiZaciM9ySBmQx/EXNZ407QVw=
github.com/kubewharf/katalyst-api v0.4.1-0.20240318070445-339cc970486a/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36 h1:AUsgMb1EaUbrmUWQU7zAXnZHAOPAH65bx/MELm3qaAQ=
github.com/kubewharf/katalyst-api v0.4.1-0.20240407031720-b62db8de7a36/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
156 changes: 141 additions & 15 deletions pkg/custom-metric/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,14 @@ func TestWithLocalStore(t *testing.T) {

p1 := generateStorePodMeta("ns-1", "pod-1", "full_metric_with_conflict_time", 11)
p2 := generateStorePodMeta("ns-2", "pod-2", "full_metric_with_multiple_data", 11)
p3 := generateStorePodMeta("ns-3", "pod-3", "full_metric_with_multiple_label", 33)
n1 := generateStoreNodeMeta("node-1", "full_metric_with_node")

baseCtx, err := katalystbase.GenerateFakeGenericContext(nil, nil, nil, []runtime.Object{p1, p2, n1})
baseCtx, err := katalystbase.GenerateFakeGenericContext(nil, nil, nil, []runtime.Object{p1, p2, p3, n1})
assert.NoError(t, err)

genericConf := &metricconf.GenericMetricConfiguration{
OutOfDataPeriod: time.Second * 20,
OutOfDataPeriod: time.Second * 10,
}
storeConf := &metricconf.StoreConfiguration{
ServiceDiscoveryConf: &generic.ServiceDiscoveryConf{
Expand All @@ -138,6 +139,8 @@ func TestWithLocalStore(t *testing.T) {
}),
},
},
PurgePeriod: time.Second * 3,
GCPeriod: time.Second * 3,
IndexLabelKeys: []string{"name"},
}

Expand Down Expand Up @@ -174,7 +177,7 @@ func testWithRemoteStoreWithIndex(t *testing.T, index []int) {
ctx := context.Background()

genericConf := &metricconf.GenericMetricConfiguration{
OutOfDataPeriod: time.Second * 20,
OutOfDataPeriod: time.Second * 10,
}
storeConf := &metricconf.StoreConfiguration{
ServiceDiscoveryConf: &generic.ServiceDiscoveryConf{
Expand All @@ -186,13 +189,14 @@ func testWithRemoteStoreWithIndex(t *testing.T, index []int) {
},
},
StoreServerReplicaTotal: len(index),
GCPeriod: time.Second,
PurgePeriod: time.Second,
GCPeriod: time.Second * 3,
PurgePeriod: time.Second * 3,
IndexLabelKeys: []string{"name"},
}

lp1 := generateStorePodMeta("ns-1", "pod-1", "full_metric_with_conflict_time", 11)
lp2 := generateStorePodMeta("ns-2", "pod-2", "full_metric_with_multiple_data", 22)
lp3 := generateStorePodMeta("ns-3", "pod-3", "full_metric_with_multiple_label", 33)
ln1 := generateStoreNodeMeta("node-1", "full_metric_with_node")

var podList []runtime.Object
Expand All @@ -207,7 +211,7 @@ func testWithRemoteStoreWithIndex(t *testing.T, index []int) {
port, err := strconv.Atoi(strings.TrimSpace(urlList[2]))
assert.NoError(t, err)

baseCtx, err := katalystbase.GenerateFakeGenericContext(nil, nil, nil, []runtime.Object{lp1, lp2, ln1})
baseCtx, err := katalystbase.GenerateFakeGenericContext(nil, nil, nil, []runtime.Object{lp1, lp2, lp3, ln1})
assert.NoError(t, err)
baseCtx.Handler = mux

Expand Down Expand Up @@ -390,6 +394,52 @@ func testProvider(t *testing.T, p MetricProvider, s store.MetricStore, ctx conte
},
},
},
{
Name: "full_metric_with_multiple_label",
Labels: map[string]string{
"selector_container": "container1",
fmt.Sprintf("%v", data.CustomMetricLabelKeyNamespace): "ns-3",
fmt.Sprintf("%v", data.CustomMetricLabelKeyObject): "pods",
fmt.Sprintf("%v", data.CustomMetricLabelKeyObjectName): "pod-3",
},
Series: []*data.MetricData{
{
Data: 0,
Timestamp: now.UnixMilli(),
},
{
Data: 50,
Timestamp: now.UnixMilli() - genericConf.OutOfDataPeriod.Milliseconds() - time.Second.Milliseconds()*2,
},
{
Data: 100,
Timestamp: now.UnixMilli() - genericConf.OutOfDataPeriod.Milliseconds() - time.Second.Milliseconds()*4,
},
},
},
{
Name: "full_metric_with_multiple_label",
Labels: map[string]string{
"selector_container": "container2",
fmt.Sprintf("%v", data.CustomMetricLabelKeyNamespace): "ns-3",
fmt.Sprintf("%v", data.CustomMetricLabelKeyObject): "pods",
fmt.Sprintf("%v", data.CustomMetricLabelKeyObjectName): "pod-3",
},
Series: []*data.MetricData{
{
Data: 100,
Timestamp: now.UnixMilli(),
},
{
Data: 150,
Timestamp: now.UnixMilli() - genericConf.OutOfDataPeriod.Milliseconds() - time.Second.Milliseconds()*2,
},
{
Data: 200,
Timestamp: now.UnixMilli() - genericConf.OutOfDataPeriod.Milliseconds() - time.Second.Milliseconds()*4,
},
},
},
{
Name: "full_metric_with_node",
Labels: map[string]string{
Expand Down Expand Up @@ -419,7 +469,7 @@ func testProvider(t *testing.T, p MetricProvider, s store.MetricStore, ctx conte
t.Log("#### 1.1: ListAllMetrics")

metricInfo = p.ListAllMetrics()
assert.Equal(t, 3, len(metricInfo))
assert.Equal(t, 4, len(metricInfo))
assert.ElementsMatch(t, []provider.CustomMetricInfo{
{
GroupResource: podGR,
Expand All @@ -431,6 +481,11 @@ func testProvider(t *testing.T, p MetricProvider, s store.MetricStore, ctx conte
Namespaced: true,
Metric: "full_metric_with_multiple_data",
},
{
GroupResource: podGR,
Namespaced: true,
Metric: "full_metric_with_multiple_label",
},
{
GroupResource: nodeGR,
Namespaced: false,
Expand Down Expand Up @@ -487,12 +542,8 @@ func testProvider(t *testing.T, p MetricProvider, s store.MetricStore, ctx conte
Metric: custom_metrics.MetricIdentifier{
Name: "full_metric_with_conflict_time",
Selector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "name",
Operator: "=",
Values: []string{"full_metric_with_conflict_time"},
},
MatchLabels: map[string]string{
"name": "full_metric_with_conflict_time",
},
},
},
Expand Down Expand Up @@ -740,13 +791,83 @@ func testProvider(t *testing.T, p MetricProvider, s store.MetricStore, ctx conte
},
}, batchExternal.Items)

t.Log("#### 1.6.1 GetAggregatedMetric pod-3")
metric, err := p.GetMetricBySelector(ctx, "ns-3", labels.Everything(),
provider.CustomMetricInfo{GroupResource: podGR, Metric: "full_metric_with_multiple_label_agg_avg"},
labels.Everything())
assert.NoError(t, err)
assert.Equal(t, 1, len(metric.Items))
windowSeconds := int64(14)
assert.ElementsMatch(t, []custom_metrics.MetricValue{
{
DescribedObject: custom_metrics.ObjectReference{
Namespace: "ns-3",
Name: "pod-3",
Kind: "pods",
},
Metric: custom_metrics.MetricIdentifier{
Name: "full_metric_with_multiple_label_agg_avg",
Selector: &metav1.LabelSelector{},
},
Timestamp: metav1.NewTime(time.UnixMilli(now.UnixMilli())),
Value: resource.MustParse("100"),
WindowSeconds: &windowSeconds,
},
}, metric.Items)

t.Log("#### 1.6.2 GetGroupedAggregatedMetric pod-3")
metric, err = p.GetMetricBySelector(ctx, "ns-3", labels.Everything(),
provider.CustomMetricInfo{GroupResource: podGR, Metric: "full_metric_with_multiple_label_agg_avg"},
labels.SelectorFromSet(labels.Set{"groupBy": "container"}))
assert.NoError(t, err)
assert.Equal(t, 2, len(metric.Items))
windowSeconds = int64(14)
assert.ElementsMatch(t, []custom_metrics.MetricValue{
{
DescribedObject: custom_metrics.ObjectReference{
Namespace: "ns-3",
Name: "pod-3",
Kind: "pods",
},
Metric: custom_metrics.MetricIdentifier{
Name: "full_metric_with_multiple_label_agg_avg",
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"container": "container1",
},
},
},
Timestamp: metav1.NewTime(time.UnixMilli(now.UnixMilli())),
Value: resource.MustParse("50"),
WindowSeconds: &windowSeconds,
},
{
DescribedObject: custom_metrics.ObjectReference{
Namespace: "ns-3",
Name: "pod-3",
Kind: "pods",
},
Metric: custom_metrics.MetricIdentifier{
Name: "full_metric_with_multiple_label_agg_avg",
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"container": "container2",
},
},
},
Timestamp: metav1.NewTime(time.UnixMilli(now.UnixMilli())),
Value: resource.MustParse("150"),
WindowSeconds: &windowSeconds,
},
}, metric.Items)

// sleep a while to trigger gc
time.Sleep(time.Second * 25)
time.Sleep(time.Second * 20)

t.Log("#### 2.1: ListAllMetrics")

metricInfo = p.ListAllMetrics()
assert.Equal(t, 3, len(metricInfo))
assert.Equal(t, 4, len(metricInfo))
assert.ElementsMatch(t, []provider.CustomMetricInfo{
{
GroupResource: podGR,
Expand All @@ -758,6 +879,11 @@ func testProvider(t *testing.T, p MetricProvider, s store.MetricStore, ctx conte
Namespaced: true,
Metric: "full_metric_with_multiple_data",
},
{
GroupResource: podGR,
Namespaced: true,
Metric: "full_metric_with_multiple_label",
},
{
GroupResource: nodeGR,
Namespaced: false,
Expand Down
2 changes: 1 addition & 1 deletion pkg/custom-metric/provider/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func PackMetricValue(m types.Metric, item types.Item, metricSelector labels.Sele
Value: item.GetQuantity(),
}
// if user specifies the metric selector, return itself.
if !metricSelector.Empty() {
if len(m.GetLabels()) == 0 && !metricSelector.Empty() {
result.Metric.Selector = convertMetricLabelSelector(metricSelector)
}

Expand Down
87 changes: 42 additions & 45 deletions pkg/custom-metric/store/data/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package data

import (
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -130,42 +131,6 @@ func (c *CachedMetric) AddSeriesMetric(sList ...types.Metric) error {
return nil
}

func (c *CachedMetric) AddAggregatedMetric(aList ...types.Metric) error {
for _, a := range aList {
d, ok := a.(*types.AggregatedMetric)
if !ok || d == nil || len(d.GetItemList()) != 1 || d.GetName() == "" {
continue
}

baseMetricMetaImp := d.GetBaseMetricMetaImp()
objectMetricStore := c.getObjectMetricStore(baseMetricMetaImp)
if objectMetricStore == nil {
var err error
objectMetricStore, err = c.addNewObjectMetricStore(baseMetricMetaImp)
if err != nil {
return err
}
}

exists, err := objectMetricStore.ObjectExists(d.ObjectMetaImp)
if err != nil {
return err
}
if !exists {
err := objectMetricStore.Add(d.ObjectMetaImp)
if err != nil {
return err
}
}
internalMetric, getErr := objectMetricStore.GetInternalMetricImp(d.ObjectMetaImp)
if getErr != nil {
return getErr
}
internalMetric.MergeAggregatedMetric(d)
}
return nil
}

// ListAllMetricMeta returns all metric meta with a flattened slice
func (c *CachedMetric) ListAllMetricMeta(withObject bool) []types.MetricMeta {
c.RLock()
Expand Down Expand Up @@ -232,8 +197,8 @@ func (c *CachedMetric) GetMetric(namespace, metricName string, objName string, o
}
} else {
metricItem, exist := internalMetric.GetAggregatedItems(metricSelector, aggName)
if exist && metricItem.Len() > 0 {
res = append(res, metricItem)
if exist && len(metricItem) > 0 {
res = append(res, metricItem...)
}
}
}
Expand Down Expand Up @@ -353,17 +318,49 @@ func MergeInternalMetricList(metricName string, metricLists ...[]types.Metric) [
})
}
} else {
merger := newAggregatedMetricMerger()
for _, metricList := range metricLists {
_ = c.AddAggregatedMetric(metricList...)
merger.addMetrics(metricList...)
}
for _, objectMetricStore := range c.metricMap {
objectMetricStore.Iterate(func(internalMetric *internal.MetricImp) {
if metricItem, exist := internalMetric.GetAggregatedItems(nil, aggName); exist && metricItem.Len() > 0 {
res = append(res, metricItem)
}
})
res = merger.getMergedMetrics()
}

return res
}

// AggregatedMetricMerger is used to merge aggregated metric
type aggregatedMetricMerger struct {
metrics map[string]*types.AggregatedMetric
}

func newAggregatedMetricMerger() *aggregatedMetricMerger {
return &aggregatedMetricMerger{
metrics: make(map[string]*types.AggregatedMetric),
}
}

func (a *aggregatedMetricMerger) getMetricKey(metric *types.AggregatedMetric) string {
return strings.Join([]string{metric.GetObjectNamespace(), metric.GetName(), metric.GetObjectKind(), metric.GetObjectName(), metric.BasicMetric.String()}, ":")
}

func (a *aggregatedMetricMerger) addMetrics(metrics ...types.Metric) {
for i := range metrics {
aggMetric := metrics[i].(*types.AggregatedMetric)
key := a.getMetricKey(aggMetric)
if oldMetric, ok := a.metrics[key]; ok {
if aggMetric.AggregatedIdentity.Timestamp > oldMetric.AggregatedIdentity.Timestamp {
a.metrics[key] = aggMetric
}
} else {
a.metrics[key] = aggMetric
}
}
}

func (a *aggregatedMetricMerger) getMergedMetrics() []types.Metric {
var res []types.Metric
for _, metric := range a.metrics {
res = append(res, metric)
}
return res
}
Loading

0 comments on commit 49b1c04

Please sign in to comment.