Skip to content

Commit cb2a2b6

Browse files
Don't hold labels from store-gateways in two forms, and don't convert them multiple times (#9914) (#9930)
* Don't hold labels from store-gateways in two forms * Don't retain labels longer than needed * Don't convert mimirpb.LabelAdaptors to labels.Labels multiple times * Add changelog entry (cherry picked from commit d2367de) Co-authored-by: Charles Korn <charleskorn@users.noreply.github.com>
1 parent c3b4ada commit cb2a2b6

File tree

8 files changed

+46
-38
lines changed

8 files changed

+46
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
* [ENHANCEMENT] PromQL: make `sort_by_label` stable. #9879
6666
* [ENHANCEMENT] Distributor: Initialize ha_tracker cache before ha_tracker and distributor reach running state and begin serving writes. #9826
6767
* [ENHANCEMENT] Ingester: `-ingest-storage.kafka.max-buffered-bytes` to limit the memory for buffered records when using concurrent fetching. #9892
68+
* [ENHANCEMENT] Querier: improve performance and memory consumption of queries that select many series. #9914
6869
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
6970
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
7071
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508

pkg/distributor/distributor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2557,7 +2557,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
25572557

25582558
result := make([]labels.Labels, 0, len(metrics))
25592559
for _, m := range metrics {
2560-
if err := queryLimiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(m)); err != nil {
2560+
if err := queryLimiter.AddSeries(m); err != nil {
25612561
return nil, err
25622562
}
25632563
result = append(result, m)

pkg/distributor/query.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
268268

269269
if len(resp.Timeseries) > 0 {
270270
for _, series := range resp.Timeseries {
271-
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
271+
if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil {
272272
return ingesterQueryResult{}, limitErr
273273
}
274274
}
@@ -285,7 +285,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
285285
}
286286

287287
for _, series := range resp.Chunkseries {
288-
if err := queryLimiter.AddSeries(series.Labels); err != nil {
288+
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil {
289289
return ingesterQueryResult{}, err
290290
}
291291
}
@@ -300,7 +300,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
300300
streamingSeriesCount += len(resp.StreamingSeries)
301301

302302
for _, s := range resp.StreamingSeries {
303-
if err := queryLimiter.AddSeries(s.Labels); err != nil {
303+
l := mimirpb.FromLabelAdaptersToLabels(s.Labels)
304+
305+
if err := queryLimiter.AddSeries(l); err != nil {
304306
return ingesterQueryResult{}, err
305307
}
306308

@@ -313,7 +315,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
313315
return ingesterQueryResult{}, err
314316
}
315317

316-
labelsBatch = append(labelsBatch, mimirpb.FromLabelAdaptersToLabels(s.Labels))
318+
labelsBatch = append(labelsBatch, l)
317319
}
318320

319321
streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)

pkg/querier/block_streaming.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"github.com/prometheus/prometheus/tsdb/chunkenc"
1818
"github.com/prometheus/prometheus/util/annotations"
1919

20-
"github.com/grafana/mimir/pkg/mimirpb"
2120
"github.com/grafana/mimir/pkg/querier/stats"
2221
"github.com/grafana/mimir/pkg/storage/series"
2322
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
@@ -31,7 +30,7 @@ import (
3130

3231
// Implementation of storage.SeriesSet, based on individual responses from store client.
3332
type blockStreamingQuerierSeriesSet struct {
34-
series []*storepb.StreamingSeries
33+
series []labels.Labels
3534
streamReader chunkStreamReader
3635

3736
// next response to process
@@ -55,18 +54,22 @@ func (bqss *blockStreamingQuerierSeriesSet) Next() bool {
5554
return false
5655
}
5756

58-
currLabels := bqss.series[bqss.nextSeriesIndex].Labels
57+
currLabels := bqss.series[bqss.nextSeriesIndex]
5958
seriesIdxStart := bqss.nextSeriesIndex // First series in this group. We might merge with more below.
6059
bqss.nextSeriesIndex++
6160

6261
// Chunks may come in multiple responses, but as soon as the response has chunks for a new series,
6362
// we can stop searching. Series are sorted. See documentation for StoreClient.Series call for details.
6463
// The actually merging of chunks happens in the Iterator() call where chunks are fetched.
65-
for bqss.nextSeriesIndex < len(bqss.series) && mimirpb.CompareLabelAdapters(currLabels, bqss.series[bqss.nextSeriesIndex].Labels) == 0 {
64+
for bqss.nextSeriesIndex < len(bqss.series) && labels.Equal(currLabels, bqss.series[bqss.nextSeriesIndex]) {
6665
bqss.nextSeriesIndex++
6766
}
6867

69-
bqss.currSeries = newBlockStreamingQuerierSeries(mimirpb.FromLabelAdaptersToLabels(currLabels), seriesIdxStart, bqss.nextSeriesIndex-1, bqss.streamReader, bqss.chunkInfo, bqss.nextSeriesIndex >= len(bqss.series), bqss.remoteAddress)
68+
bqss.currSeries = newBlockStreamingQuerierSeries(currLabels, seriesIdxStart, bqss.nextSeriesIndex-1, bqss.streamReader, bqss.chunkInfo, bqss.nextSeriesIndex >= len(bqss.series), bqss.remoteAddress)
69+
70+
// Clear any labels we no longer need, to allow them to be garbage collected when they're no longer needed elsewhere.
71+
clear(bqss.series[seriesIdxStart : bqss.nextSeriesIndex-1])
72+
7073
return true
7174
}
7275

pkg/querier/block_streaming_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"go.uber.org/atomic"
2020
"google.golang.org/grpc/metadata"
2121

22-
"github.com/grafana/mimir/pkg/mimirpb"
2322
"github.com/grafana/mimir/pkg/querier/stats"
2423
"github.com/grafana/mimir/pkg/storegateway/storepb"
2524
"github.com/grafana/mimir/pkg/util/limiter"
@@ -166,9 +165,7 @@ func TestBlockStreamingQuerierSeriesSet(t *testing.T) {
166165
t.Run(name, func(t *testing.T) {
167166
ss := &blockStreamingQuerierSeriesSet{streamReader: &mockChunkStreamer{series: c.input, causeError: c.errorChunkStreamer}}
168167
for _, s := range c.input {
169-
ss.series = append(ss.series, &storepb.StreamingSeries{
170-
Labels: mimirpb.FromLabelsToLabelAdapters(s.lbls),
171-
})
168+
ss.series = append(ss.series, s.lbls)
172169
}
173170
idx := 0
174171
var it chunkenc.Iterator

pkg/querier/blocks_store_queryable.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -780,9 +780,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
780780
return err
781781
}
782782

783-
// A storegateway client will only fill either of mySeries or myStreamingSeries, and not both.
783+
// A storegateway client will only fill either of mySeries or myStreamingSeriesLabels, and not both.
784784
mySeries := []*storepb.Series(nil)
785-
myStreamingSeries := []*storepb.StreamingSeries(nil)
785+
myStreamingSeriesLabels := []labels.Labels(nil)
786786
var myWarnings annotations.Annotations
787787
myQueriedBlocks := []ulid.ULID(nil)
788788
indexBytesFetched := uint64(0)
@@ -813,7 +813,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
813813
mySeries = append(mySeries, s)
814814

815815
// Add series fingerprint to query limiter; will return error if we are over the limit
816-
if err := queryLimiter.AddSeries(s.Labels); err != nil {
816+
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil {
817817
return err
818818
}
819819

@@ -853,16 +853,22 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
853853
}
854854

855855
if ss := resp.GetStreamingSeries(); ss != nil {
856+
myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series))
857+
856858
for _, s := range ss.Series {
857859
// Add series fingerprint to query limiter; will return error if we are over the limit
858-
if limitErr := queryLimiter.AddSeries(s.Labels); limitErr != nil {
860+
l := mimirpb.FromLabelAdaptersToLabels(s.Labels)
861+
862+
if limitErr := queryLimiter.AddSeries(l); limitErr != nil {
859863
return limitErr
860864
}
865+
866+
myStreamingSeriesLabels = append(myStreamingSeriesLabels, l)
861867
}
862-
myStreamingSeries = append(myStreamingSeries, ss.Series...)
868+
863869
if ss.IsEndOfSeriesStream {
864870
// If we aren't expecting any series from this stream, close it now.
865-
if len(myStreamingSeries) == 0 {
871+
if len(myStreamingSeriesLabels) == 0 {
866872
util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck
867873
}
868874

@@ -904,13 +910,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
904910
chunkInfo.EndSeries(i == len(mySeries)-1)
905911
}
906912
}
907-
} else if len(myStreamingSeries) > 0 {
913+
} else if len(myStreamingSeriesLabels) > 0 {
908914
// FetchedChunks and FetchedChunkBytes are added by the SeriesChunksStreamReader.
909-
reqStats.AddFetchedSeries(uint64(len(myStreamingSeries)))
910-
streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeries), queryLimiter, reqStats, q.metrics, q.logger)
915+
reqStats.AddFetchedSeries(uint64(len(myStreamingSeriesLabels)))
916+
streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeriesLabels), queryLimiter, reqStats, q.metrics, q.logger)
911917
level.Debug(log).Log("msg", "received streaming series from store-gateway",
912918
"instance", c.RemoteAddress(),
913-
"fetched series", len(myStreamingSeries),
919+
"fetched series", len(myStreamingSeriesLabels),
914920
"fetched index bytes", indexBytesFetched,
915921
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
916922
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))
@@ -925,12 +931,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
925931
mtx.Lock()
926932
if len(mySeries) > 0 {
927933
seriesSets = append(seriesSets, &blockQuerierSeriesSet{series: mySeries})
928-
} else if len(myStreamingSeries) > 0 {
934+
} else if len(myStreamingSeriesLabels) > 0 {
929935
if chunkInfo != nil {
930936
chunkInfo.SetMsg("store-gateway streaming")
931937
}
932938
seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{
933-
series: myStreamingSeries,
939+
series: myStreamingSeriesLabels,
934940
streamReader: streamReader,
935941
chunkInfo: chunkInfo,
936942
remoteAddress: c.RemoteAddress(),

pkg/util/limiter/query_limiter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ import (
99
"context"
1010
"sync"
1111

12+
"github.com/prometheus/prometheus/model/labels"
1213
"go.uber.org/atomic"
1314

14-
"github.com/grafana/mimir/pkg/mimirpb"
1515
"github.com/grafana/mimir/pkg/querier/stats"
1616
"github.com/grafana/mimir/pkg/util/validation"
1717
)
@@ -74,12 +74,12 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
7474
}
7575

7676
// AddSeries adds the input series and returns an error if the limit is reached.
77-
func (ql *QueryLimiter) AddSeries(seriesLabels []mimirpb.LabelAdapter) validation.LimitError {
77+
func (ql *QueryLimiter) AddSeries(seriesLabels labels.Labels) validation.LimitError {
7878
// If the max series is unlimited just return without managing map
7979
if ql.maxSeriesPerQuery == 0 {
8080
return nil
8181
}
82-
fingerprint := mimirpb.FromLabelAdaptersToLabels(seriesLabels).Hash()
82+
fingerprint := seriesLabels.Hash()
8383

8484
ql.uniqueSeriesMx.Lock()
8585
defer ql.uniqueSeriesMx.Unlock()

pkg/util/limiter/query_limiter_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/stretchr/testify/assert"
1717
"github.com/stretchr/testify/require"
1818

19-
"github.com/grafana/mimir/pkg/mimirpb"
2019
"github.com/grafana/mimir/pkg/querier/stats"
2120
)
2221

@@ -37,15 +36,15 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing
3736
reg = prometheus.NewPedanticRegistry()
3837
limiter = NewQueryLimiter(100, 0, 0, 0, stats.NewQueryMetrics(reg))
3938
)
40-
err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
39+
err := limiter.AddSeries(series1)
4140
assert.NoError(t, err)
42-
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
41+
err = limiter.AddSeries(series2)
4342
assert.NoError(t, err)
4443
assert.Equal(t, 2, limiter.uniqueSeriesCount())
4544
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)
4645

4746
// Re-add previous series to make sure it's not double counted
48-
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
47+
err = limiter.AddSeries(series1)
4948
assert.NoError(t, err)
5049
assert.Equal(t, 2, limiter.uniqueSeriesCount())
5150
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)
@@ -72,21 +71,21 @@ func TestQueryLimiter_AddSeries_ShouldReturnErrorOnLimitExceeded(t *testing.T) {
7271
reg = prometheus.NewPedanticRegistry()
7372
limiter = NewQueryLimiter(1, 0, 0, 0, stats.NewQueryMetrics(reg))
7473
)
75-
err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
74+
err := limiter.AddSeries(series1)
7675
require.NoError(t, err)
7776
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)
7877

79-
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
78+
err = limiter.AddSeries(series2)
8079
require.Error(t, err)
8180
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)
8281

8382
// Add the same series again and ensure that we don't increment the failed queries metric again.
84-
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
83+
err = limiter.AddSeries(series2)
8584
require.Error(t, err)
8685
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)
8786

8887
// Add another series and ensure that we don't increment the failed queries metric again.
89-
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series3))
88+
err = limiter.AddSeries(series3)
9089
require.Error(t, err)
9190
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)
9291
}
@@ -188,7 +187,7 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
188187
reg := prometheus.NewPedanticRegistry()
189188
limiter := NewQueryLimiter(b.N+1, 0, 0, 0, stats.NewQueryMetrics(reg))
190189
for _, s := range series {
191-
err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(s))
190+
err := limiter.AddSeries(s)
192191
assert.NoError(b, err)
193192
}
194193
}

0 commit comments

Comments
 (0)