Skip to content

Commit

Permalink
cloudv2: Not replicate common fields (#3144)
Browse files Browse the repository at this point in the history
It normalizes in the metric set the common fields as test_run_id and aggregation_period so we can reduce the memory allocation on the client. The Cloud's backend will denormalize and convert them into Prom's labels.
  • Loading branch information
codebien authored Jul 5, 2023
1 parent ac9bfd5 commit aa79387
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 243 deletions.
16 changes: 7 additions & 9 deletions output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ func (f *metricsFlusher) flush(_ context.Context) error {
}

type metricSetBuilder struct {
MetricSet *pbcloud.MetricSet
TestRunID string
AggregationPeriodInSeconds uint32
MetricSet *pbcloud.MetricSet

// TODO: If we will introduce the metricID then we could
// just use it as map's key (map[uint64]pbcloud.Metric). It is faster.
Expand All @@ -92,15 +90,16 @@ type metricSetBuilder struct {
}

func newMetricSetBuilder(testRunID string, aggrPeriodSec uint32) metricSetBuilder {
return metricSetBuilder{
TestRunID: testRunID,
MetricSet: &pbcloud.MetricSet{},
AggregationPeriodInSeconds: aggrPeriodSec,
builder := metricSetBuilder{
MetricSet: &pbcloud.MetricSet{},
// TODO: evaluate if removing the pointer from pbcloud.Metric
// is a better trade-off
metrics: make(map[*metrics.Metric]*pbcloud.Metric),
seriesIndex: make(map[metrics.TimeSeries]uint),
}
builder.MetricSet.TestRunId = testRunID
builder.MetricSet.AggregationPeriod = aggrPeriodSec
return builder
}

func (msb *metricSetBuilder) addTimeBucket(bucket timeBucket) {
Expand All @@ -119,8 +118,7 @@ func (msb *metricSetBuilder) addTimeBucket(bucket timeBucket) {
ix, ok := msb.seriesIndex[timeSeries]
if !ok {
pbTimeSeries = &pbcloud.TimeSeries{
AggregationPeriod: msb.AggregationPeriodInSeconds,
Labels: mapTimeSeriesLabelsProto(timeSeries, msb.TestRunID),
Labels: mapTimeSeriesLabelsProto(timeSeries.Tags),
}
pbmetric.TimeSeries = append(pbmetric.TimeSeries, pbTimeSeries)
msb.seriesIndex[timeSeries] = uint(len(pbmetric.TimeSeries) - 1)
Expand Down
2 changes: 0 additions & 2 deletions output/cloud/expv2/hdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@ func (h *histogram) appendBuckets(index uint32) {
func histogramAsProto(h *histogram, time int64) *pbcloud.TrendHdrValue {
hval := &pbcloud.TrendHdrValue{
Time: timestampAsProto(time),
MinResolution: 1.0,
SignificantDigits: 2,
LowerCounterIndex: h.FirstNotZeroBucket,
MinValue: h.Min,
MaxValue: h.Max,
Expand Down
2 changes: 0 additions & 2 deletions output/cloud/expv2/hdr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,6 @@ func TestHistogramAsProto(t *testing.T) {
for _, v := range tc.vals {
h.addToBucket(v)
}
tc.exp.MinResolution = 1.0
tc.exp.SignificantDigits = 2
tc.exp.Time = &timestamppb.Timestamp{Seconds: 1}
assert.Equal(t, tc.exp, histogramAsProto(&h, time.Unix(1, 0).UnixNano()), tc.name)
}
Expand Down
42 changes: 3 additions & 39 deletions output/cloud/expv2/integration/testdata/metricset.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
{
"testRunId": "my-test-run-id-123",
"aggregationPeriod": 3,
"metrics": [
{
"name": "metric_counter_1",
"type": "METRIC_TYPE_COUNTER",
"timeSeries": [
{
"labels": [
{
"name": "__name__",
"value": "metric_counter_1"
},
{
"name": "test_run_id",
"value": "my-test-run-id-123"
},
{
"name": "my_label_1",
"value": "my_label_value_1"
}
],
"aggregationPeriod": 3,
"counterSamples": {
"values": [
{
Expand All @@ -37,20 +30,11 @@
"timeSeries": [
{
"labels": [
{
"name": "__name__",
"value": "metric_gauge_2"
},
{
"name": "test_run_id",
"value": "my-test-run-id-123"
},
{
"name": "my_label_2",
"value": "my_label_value_2"
}
],
"aggregationPeriod": 3,
"gaugeSamples": {
"values": [
{
Expand All @@ -72,20 +56,11 @@
"timeSeries": [
{
"labels": [
{
"name": "__name__",
"value": "metric_rate_3"
},
{
"name": "test_run_id",
"value": "my-test-run-id-123"
},
{
"name": "my_label_3",
"value": "my_label_value_3"
}
],
"aggregationPeriod": 3,
"rateSamples": {
"values": [
{
Expand All @@ -104,20 +79,11 @@
"timeSeries": [
{
"labels": [
{
"name": "__name__",
"value": "metric_trend_4"
},
{
"name": "test_run_id",
"value": "my-test-run-id-123"
},
{
"name": "my_label_4",
"value": "my_label_value_4"
}
],
"aggregationPeriod": 3,
"trendHdrSamples": {
"values": [
{
Expand All @@ -128,9 +94,7 @@
],
"lowerCounterIndex": 6,
"maxValue": 6,
"minResolution": 1,
"minValue": 6,
"significantDigits": 2,
"sum": 6
}
]
Expand All @@ -139,4 +103,4 @@
]
}
]
}
}
9 changes: 3 additions & 6 deletions output/cloud/expv2/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@ import (
)

// TODO: unit test
func mapTimeSeriesLabelsProto(timeSeries metrics.TimeSeries, testRunID string) []*pbcloud.Label {
labels := make([]*pbcloud.Label, 0, ((*atlas.Node)(timeSeries.Tags)).Len()+2)
labels = append(labels,
&pbcloud.Label{Name: "__name__", Value: timeSeries.Metric.Name},
&pbcloud.Label{Name: "test_run_id", Value: testRunID})
func mapTimeSeriesLabelsProto(tags *metrics.TagSet) []*pbcloud.Label {
labels := make([]*pbcloud.Label, 0, ((*atlas.Node)(tags)).Len())

// TODO: move it as a shared func
// https://github.com/grafana/k6/issues/2764
n := (*atlas.Node)(timeSeries.Tags)
n := (*atlas.Node)(tags)
if n.Len() < 1 {
return labels
}
Expand Down
Loading

0 comments on commit aa79387

Please sign in to comment.