From 4af26b59ece585b8e65fa44db854a9d2e39406e3 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 26 Dec 2024 16:19:23 +0100 Subject: [PATCH] SeriesResponse: Use memory pooling Signed-off-by: Arve Knudsen --- go.mod | 2 +- pkg/distributor/distributor.go | 10 +- pkg/querier/block.go | 10 +- pkg/querier/block_streaming.go | 52 +- pkg/querier/block_streaming_test.go | 172 ++- pkg/querier/block_test.go | 160 +-- pkg/querier/blocks_store_queryable.go | 31 +- pkg/querier/blocks_store_queryable_test.go | 24 +- pkg/storegateway/bucket.go | 20 +- pkg/storegateway/bucket_store_server_test.go | 12 +- pkg/storegateway/bucket_stores_test.go | 6 +- pkg/storegateway/bucket_test.go | 26 +- pkg/storegateway/gateway_test.go | 6 +- pkg/storegateway/storepb/custom.go | 1116 +++++++++++++++++- pkg/storegateway/storepb/rpc.pb.go | 170 ++- pkg/storegateway/storepb/rpc.pb.go.expdiff | 47 +- pkg/storegateway/storepb/rpc.proto | 6 +- pkg/storegateway/storepb/types.proto | 1 - 18 files changed, 1544 insertions(+), 327 deletions(-) diff --git a/go.mod b/go.mod index 6b01406ab55..07875b4afd2 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/grafana/mimir -go 1.22.7 +go 1.23 // Please note that this directive is ignored when building with the Mimir build image, // that will always use its bundled toolchain. diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 61c76c6f198..1400f46c0da 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1528,8 +1528,14 @@ func NextOrCleanup(next PushFunc, pushReq *Request) (_ PushFunc, maybeCleanup fu return next(ctx, req) }, func() { - if cleanupInDefer { - pushReq.CleanUp() + if !cleanupInDefer { + return + } + + req, _ := pushReq.WriteRequest() + pushReq.CleanUp() + if req != nil { + req.FreeBuffer() } } } diff --git a/pkg/querier/block.go b/pkg/querier/block.go index b98a4c1ccb1..e00c9b880b5 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -48,7 +48,7 @@ func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMa // Implementation of storage.SeriesSet, based on individual responses from store client. type blockQuerierSeriesSet struct { - series []*storepb.Series + series []*storepb.CustomSeries // next response to process next int @@ -60,6 +60,10 @@ func (bqss *blockQuerierSeriesSet) Next() bool { bqss.currSeries = nil if bqss.next >= len(bqss.series) { + for _, s := range bqss.series { + s.Release() + } + bqss.series = nil return false } @@ -134,7 +138,9 @@ func newBlockQuerierSeriesIterator(reuse chunkenc.Iterator, lbls labels.Labels, return chunk.ErrorIterator(fmt.Sprintf("cannot create new chunk for series %s: %s", lbls.String(), err.Error())) } - if err := ch.UnmarshalFromBuf(c.Raw.Data); err != nil { + err = ch.UnmarshalFromBuf(c.Raw.Data) + storepb.ReleaseChunk(&c.Raw) + if err != nil { return chunk.ErrorIterator(fmt.Sprintf("cannot unmarshal chunk for series %s: %s", lbls.String(), err.Error())) } diff --git a/pkg/querier/block_streaming.go b/pkg/querier/block_streaming.go index 0c412819b5f..f288b0add3f 100644 --- a/pkg/querier/block_streaming.go +++ b/pkg/querier/block_streaming.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "io" - "slices" "sort" "github.com/go-kit/log" @@ -120,6 +119,9 @@ func (bqs *blockStreamingQuerierSeries) Iterator(reuse chunkenc.Iterator) chunke for i := bqs.seriesIdxStart; i <= bqs.seriesIdxEnd; i++ { chks, err := bqs.streamReader.GetChunks(uint64(i)) if err != nil { + for _, chk := range allChunks { + storepb.ReleaseChunk(&chk.Raw) + } return series.NewErrIterator(err) } allChunks = append(allChunks, chks...) @@ -155,8 +157,8 @@ type storeGatewayStreamReader struct { log log.Logger chunkCountEstimateChan chan int - seriesChunksChan chan *storepb.StreamingChunksBatch - chunksBatch []*storepb.StreamingChunks + seriesChunksChan chan *storepb.CustomStreamingChunksBatch + chunksBatch *storepb.CustomStreamingChunksBatch errorChan chan error err error } @@ -189,7 +191,7 @@ func (s *storeGatewayStreamReader) Close() { func (s *storeGatewayStreamReader) StartBuffering() { // Important: to ensure that the goroutine does not become blocked and leak, the goroutine must only ever write to errorChan at most once. s.errorChan = make(chan error, 1) - s.seriesChunksChan = make(chan *storepb.StreamingChunksBatch, 1) + s.seriesChunksChan = make(chan *storepb.CustomStreamingChunksBatch, 1) s.chunkCountEstimateChan = make(chan int, 1) go func() { @@ -244,7 +246,6 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error } estimate := msg.GetStreamingChunksEstimate() - msg.FreeBuffer() if estimate == nil { return fmt.Errorf("expected to receive chunks estimate, but got message of type %T", msg.Result) } @@ -262,18 +263,16 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error batch := msg.GetStreamingChunks() if batch == nil { - msg.FreeBuffer() return fmt.Errorf("expected to receive streaming chunks, but got message of type %T", msg.Result) } if len(batch.Series) == 0 { - msg.FreeBuffer() + batch.Release() continue } totalSeries += len(batch.Series) if totalSeries > s.expectedSeriesCount { - msg.FreeBuffer() return fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries) } @@ -287,37 +286,23 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error } totalChunks += numChunks if err := s.queryLimiter.AddChunks(numChunks); err != nil { - msg.FreeBuffer() return err } if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil { - msg.FreeBuffer() return err } s.stats.AddFetchedChunks(uint64(numChunks)) s.stats.AddFetchedChunkBytes(uint64(chunkBytes)) - // Memory safe copy. - safeSeries := make([]*storepb.StreamingChunks, 0, len(batch.Series)) - for _, s := range batch.Series { - safe := *s - safe.Chunks = slices.Clone(s.Chunks) - for i, c := range safe.Chunks { - safe.Chunks[i].Raw.Data = slices.Clone(c.Raw.Data) - } - safeSeries = append(safeSeries, &safe) - } - batch.Series = safeSeries - msg.FreeBuffer() - if err := s.sendBatch(batch); err != nil { + batch.Release() return err } } } -func (s *storeGatewayStreamReader) sendBatch(c *storepb.StreamingChunksBatch) error { +func (s *storeGatewayStreamReader) sendBatch(c *storepb.CustomStreamingChunksBatch) error { if err := s.ctx.Err(); err != nil { // If the context is already cancelled, stop now for the same reasons as below. // We do this extra check here to ensure that we don't get unlucky and continue to send to seriesChunksChan even if @@ -371,20 +356,26 @@ func (s *storeGatewayStreamReader) GetChunks(seriesIndex uint64) (_ []storepb.Ag s.err = err }() - if len(s.chunksBatch) == 0 { + if s.chunksBatch == nil { if err := s.readNextBatch(seriesIndex); err != nil { return nil, err } } - chks := s.chunksBatch[0] - if len(s.chunksBatch) > 1 { - s.chunksBatch = s.chunksBatch[1:] + chks := s.chunksBatch.Series[0] + if len(s.chunksBatch.Series) > 1 { + s.chunksBatch.Series = s.chunksBatch.Series[1:] } else { + // Take ownership of chks before releasing the batch. + s.chunksBatch.Series = s.chunksBatch.Series[:0] + s.chunksBatch.Release() s.chunksBatch = nil } if chks.SeriesIndex != seriesIndex { + for _, chk := range chks.Chunks { + storepb.ReleaseChunk(&chk.Raw) + } return nil, fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has series with index %v", seriesIndex, chks.SeriesIndex) } @@ -400,6 +391,9 @@ func (s *storeGatewayStreamReader) GetChunks(seriesIndex uint64) (_ []storepb.Ag // is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being // logged and included in metrics and traces, when in fact the call succeeded. if err := <-s.errorChan; err != nil { + for _, chk := range chks.Chunks { + storepb.ReleaseChunk(&chk.Raw) + } return nil, fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has failed: %w", seriesIndex, err) } } @@ -426,7 +420,7 @@ func (s *storeGatewayStreamReader) readNextBatch(seriesIndex uint64) error { return fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount) } - s.chunksBatch = chks.Series + s.chunksBatch = chks return nil } diff --git a/pkg/querier/block_streaming_test.go b/pkg/querier/block_streaming_test.go index 48b62329aa9..136fb932021 100644 --- a/pkg/querier/block_streaming_test.go +++ b/pkg/querier/block_streaming_test.go @@ -261,28 +261,42 @@ func TestStoreGatewayStreamReader_HappyPaths(t *testing.T) { "single series per batch": { messages: batchesToMessages( 40, - storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: series0}}}, - storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: series1}}}, - storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: series2}}}, - storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 3, Chunks: series3}}}, - storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 4, Chunks: series4}}}, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: series0}}}, + }, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: series1}}}, + }, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: series2}}}, + }, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 3, Chunks: series3}}}, + }, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 4, Chunks: series4}}}, + }, ), expectedChunksEstimate: 40, }, "multiple series per batch": { messages: batchesToMessages( 40, - storepb.StreamingChunksBatch{ - Series: []*storepb.StreamingChunks{ - {SeriesIndex: 0, Chunks: series0}, - {SeriesIndex: 1, Chunks: series1}, - {SeriesIndex: 2, Chunks: series2}, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 0, Chunks: series0}, + {SeriesIndex: 1, Chunks: series1}, + {SeriesIndex: 2, Chunks: series2}, + }, }, }, - storepb.StreamingChunksBatch{ - Series: []*storepb.StreamingChunks{ - {SeriesIndex: 3, Chunks: series3}, - {SeriesIndex: 4, Chunks: series4}, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 3, Chunks: series3}, + {SeriesIndex: 4, Chunks: series4}, + }, }, }, ), @@ -291,21 +305,29 @@ func TestStoreGatewayStreamReader_HappyPaths(t *testing.T) { "empty batches": { messages: batchesToMessages( 40, - storepb.StreamingChunksBatch{ - Series: []*storepb.StreamingChunks{ - {SeriesIndex: 0, Chunks: series0}, - {SeriesIndex: 1, Chunks: series1}, - {SeriesIndex: 2, Chunks: series2}, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 0, Chunks: series0}, + {SeriesIndex: 1, Chunks: series1}, + {SeriesIndex: 2, Chunks: series2}, + }, }, }, - storepb.StreamingChunksBatch{}, - storepb.StreamingChunksBatch{ - Series: []*storepb.StreamingChunks{ - {SeriesIndex: 3, Chunks: series3}, - {SeriesIndex: 4, Chunks: series4}, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{}, + }, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 3, Chunks: series3}, + {SeriesIndex: 4, Chunks: series4}, + }, }, }, - storepb.StreamingChunksBatch{}, + storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{}, + }, ), expectedChunksEstimate: 40, }, @@ -375,10 +397,19 @@ func TestStoreGatewayStreamReader_AbortsWhenParentContextCancelled(t *testing.T) for name, testCase := range testCases { t.Run(name, func(t *testing.T) { // Create multiple batches to ensure that the buffering goroutine becomes blocked waiting to send further chunks to GetChunks(). - batches := []storepb.StreamingChunksBatch{ - {Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}}, - {Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}}}, - {Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}}}, + batches := []storepb.CustomStreamingChunksBatch{ + { + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}}, + }, + { + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}}}, + }, + { + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}}}, + }, } streamCtx := context.Background() @@ -402,10 +433,19 @@ func TestStoreGatewayStreamReader_DoesNotAbortWhenStreamContextCancelled(t *test test.VerifyNoLeak(t) // Create multiple batches to ensure that the buffering goroutine becomes blocked waiting to send further chunks to GetChunks(). - batches := []storepb.StreamingChunksBatch{ - {Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}}, - {Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}}}, - {Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}}}, + batches := []storepb.CustomStreamingChunksBatch{ + { + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}}, + }, + { + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}}}, + }, + { + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}}}, + }, } streamCtx, cancel := context.WithCancel(context.Background()) @@ -430,8 +470,11 @@ func TestStoreGatewayStreamReader_DoesNotAbortWhenStreamContextCancelled(t *test } func TestStoreGatewayStreamReader_ReadingSeriesOutOfOrder(t *testing.T) { - batches := []storepb.StreamingChunksBatch{ - {Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}}, + batches := []storepb.CustomStreamingChunksBatch{ + { + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}}, + }, } ctx := context.Background() @@ -447,8 +490,11 @@ func TestStoreGatewayStreamReader_ReadingSeriesOutOfOrder(t *testing.T) { func TestStoreGatewayStreamReader_ReadingMoreSeriesThanAvailable(t *testing.T) { firstSeries := []storepb.AggrChunk{createChunk(t, 1000, 1.23)} - batches := []storepb.StreamingChunksBatch{ - {Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: firstSeries}}}, + batches := []storepb.CustomStreamingChunksBatch{ + { + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: firstSeries}}}, + }, } ctx := context.Background() @@ -475,8 +521,11 @@ func TestStoreGatewayStreamReader_ReadingMoreSeriesThanAvailable(t *testing.T) { func TestStoreGatewayStreamReader_ReceivedFewerSeriesThanExpected(t *testing.T) { firstSeries := []storepb.AggrChunk{createChunk(t, 1000, 1.23)} - batches := []storepb.StreamingChunksBatch{ - {Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: firstSeries}}}, + batches := []storepb.CustomStreamingChunksBatch{ + { + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: firstSeries}}}, + }, } ctx := context.Background() @@ -505,26 +554,32 @@ func TestStoreGatewayStreamReader_ReceivedFewerSeriesThanExpected(t *testing.T) } func TestStoreGatewayStreamReader_ReceivedMoreSeriesThanExpected(t *testing.T) { - testCases := map[string][]storepb.StreamingChunksBatch{ + testCases := map[string][]storepb.CustomStreamingChunksBatch{ "extra series received as part of batch for last expected series": { { - Series: []*storepb.StreamingChunks{ - {SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, - {SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}, - {SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}, + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, + {SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}, + {SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}, + }, }, }, }, "extra series received as part of batch after batch containing last expected series": { { - Series: []*storepb.StreamingChunks{ - {SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, + }, }, }, { - Series: []*storepb.StreamingChunks{ - {SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}, - {SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}, + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}, + {SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}, + }, }, }, }, @@ -581,12 +636,15 @@ func TestStoreGatewayStreamReader_ChunksLimits(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - batches := []storepb.StreamingChunksBatch{ - {Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{ - createChunk(t, 1000, 1.23), - createChunk(t, 1100, 1.23), - createChunk(t, 1200, 1.23), - }}}}, + batches := []storepb.CustomStreamingChunksBatch{ + { + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{ + createChunk(t, 1000, 1.23), + createChunk(t, 1100, 1.23), + createChunk(t, 1200, 1.23), + }}}}, + }, } ctx := context.Background() @@ -636,13 +694,13 @@ func createChunk(t *testing.T, time int64, value float64) storepb.AggrChunk { } } -func batchesToMessages(estimatedChunks uint64, batches ...storepb.StreamingChunksBatch) []*storepb.SeriesResponse { +func batchesToMessages(estimatedChunks uint64, batches ...storepb.CustomStreamingChunksBatch) []*storepb.SeriesResponse { messages := make([]*storepb.SeriesResponse, len(batches)+1) messages[0] = storepb.NewStreamingChunksEstimate(estimatedChunks) for i, b := range batches { - messages[i+1] = storepb.NewStreamingChunksResponse(&b) + messages[i+1] = storepb.NewStreamingChunksResponse(b) } return messages diff --git a/pkg/querier/block_test.go b/pkg/querier/block_test.go index 55ac10b5cd0..b2653f405ff 100644 --- a/pkg/querier/block_test.go +++ b/pkg/querier/block_test.go @@ -199,104 +199,126 @@ func TestBlockQuerierSeriesSet(t *testing.T) { getSeriesSet := func() *blockQuerierSeriesSet { return &blockQuerierSeriesSet{ - series: []*storepb.Series{ + series: []*storepb.CustomSeries{ // first, with one chunk. { - Labels: mkZLabels("__name__", "first", "a", "a"), - Chunks: []storepb.AggrChunk{ - createAggrChunkWithSineSamples(now, now.Add(100*time.Second-time.Millisecond), 3*time.Millisecond), // ceil(100 / 0.003) samples (= 33334) + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "first", "a", "a"), + Chunks: []storepb.AggrChunk{ + createAggrChunkWithSineSamples(now, now.Add(100*time.Second-time.Millisecond), 3*time.Millisecond), // ceil(100 / 0.003) samples (= 33334) + }, }, }, // continuation of previous series. Must have exact same labels. { - Labels: mkZLabels("__name__", "first", "a", "a"), - Chunks: []storepb.AggrChunk{ - createAggrChunkWithSineSamples(now.Add(100*time.Second), now.Add(200*time.Second-time.Millisecond), 3*time.Millisecond), // ceil(100 / 0.003) samples (= 33334) samples more, 66668 in total + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "first", "a", "a"), + Chunks: []storepb.AggrChunk{ + createAggrChunkWithSineSamples(now.Add(100*time.Second), now.Add(200*time.Second-time.Millisecond), 3*time.Millisecond), // ceil(100 / 0.003) samples (= 33334) samples more, 66668 in total + }, }, }, // second, with multiple chunks { - Labels: mkZLabels("__name__", "second"), - Chunks: []storepb.AggrChunk{ - // unordered chunks - createAggrChunkWithSineSamples(now.Add(400*time.Second), now.Add(600*time.Second-5*time.Millisecond), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples, = 120000 in total) - createAggrChunkWithSineSamples(now.Add(200*time.Second), now.Add(400*time.Second-5*time.Millisecond), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples) - createAggrChunkWithSineSamples(now, now.Add(200*time.Second-5*time.Millisecond), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples) + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "second"), + Chunks: []storepb.AggrChunk{ + // unordered chunks + createAggrChunkWithSineSamples(now.Add(400*time.Second), now.Add(600*time.Second-5*time.Millisecond), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples, = 120000 in total) + createAggrChunkWithSineSamples(now.Add(200*time.Second), now.Add(400*time.Second-5*time.Millisecond), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples) + createAggrChunkWithSineSamples(now, now.Add(200*time.Second-5*time.Millisecond), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples) + }, }, }, // overlapping { - Labels: mkZLabels("__name__", "overlapping"), - Chunks: []storepb.AggrChunk{ - createAggrChunkWithSineSamples(now, now.Add(10*time.Second-5*time.Millisecond), 5*time.Millisecond), // 10 / 0.005 = 2000 samples + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "overlapping"), + Chunks: []storepb.AggrChunk{ + createAggrChunkWithSineSamples(now, now.Add(10*time.Second-5*time.Millisecond), 5*time.Millisecond), // 10 / 0.005 = 2000 samples + }, }, }, { - Labels: mkZLabels("__name__", "overlapping"), - Chunks: []storepb.AggrChunk{ - // 10 / 0.005 = 2000 samples, but first 1000 are overlapping with previous series, so this chunk only contributes 1000 - createAggrChunkWithSineSamples(now.Add(5*time.Second), now.Add(15*time.Second-5*time.Millisecond), 5*time.Millisecond), + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "overlapping"), + Chunks: []storepb.AggrChunk{ + // 10 / 0.005 = 2000 samples, but first 1000 are overlapping with previous series, so this chunk only contributes 1000 + createAggrChunkWithSineSamples(now.Add(5*time.Second), now.Add(15*time.Second-5*time.Millisecond), 5*time.Millisecond), + }, }, }, // overlapping 2. Chunks here come in wrong order. { - Labels: mkZLabels("__name__", "overlapping2"), - Chunks: []storepb.AggrChunk{ - // entire range overlaps with the next chunk, so this chunks contributes 0 samples (it will be sorted as second) - createAggrChunkWithSineSamples(now.Add(3*time.Second), now.Add(7*time.Second-5*time.Millisecond), 5*time.Millisecond), + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "overlapping2"), + Chunks: []storepb.AggrChunk{ + // entire range overlaps with the next chunk, so this chunks contributes 0 samples (it will be sorted as second) + createAggrChunkWithSineSamples(now.Add(3*time.Second), now.Add(7*time.Second-5*time.Millisecond), 5*time.Millisecond), + }, }, }, { - Labels: mkZLabels("__name__", "overlapping2"), - Chunks: []storepb.AggrChunk{ - // this chunk has completely overlaps previous chunk. Since its minTime is lower, it will be sorted as first. - createAggrChunkWithSineSamples(now, now.Add(10*time.Second-5*time.Millisecond), 5*time.Millisecond), // 10 / 0.005 = 2000 samples + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "overlapping2"), + Chunks: []storepb.AggrChunk{ + // this chunk has completely overlaps previous chunk. Since its minTime is lower, it will be sorted as first. + createAggrChunkWithSineSamples(now, now.Add(10*time.Second-5*time.Millisecond), 5*time.Millisecond), // 10 / 0.005 = 2000 samples + }, }, }, { - Labels: mkZLabels("__name__", "overlapping2"), - Chunks: []storepb.AggrChunk{ - // no samples - createAggrChunkWithSineSamples(now, now.Add(-5*time.Millisecond), 5*time.Millisecond), + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "overlapping2"), + Chunks: []storepb.AggrChunk{ + // no samples + createAggrChunkWithSineSamples(now, now.Add(-5*time.Millisecond), 5*time.Millisecond), + }, }, }, { - Labels: mkZLabels("__name__", "overlapping2"), - Chunks: []storepb.AggrChunk{ - // 2000 samples more (10 / 0.005) - createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(30*time.Second-5*time.Millisecond), 5*time.Millisecond), + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "overlapping2"), + Chunks: []storepb.AggrChunk{ + // 2000 samples more (10 / 0.005) + createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(30*time.Second-5*time.Millisecond), 5*time.Millisecond), + }, }, }, // many_empty_chunks is a series which contains many empty chunks and only a few that have data { - Labels: mkZLabels("__name__", "many_empty_chunks"), - Chunks: []storepb.AggrChunk{ - createAggrChunkWithSineSamples(now, now.Add(-5*time.Millisecond), 5*time.Millisecond), // empty - createAggrChunkWithSineSamples(now, now.Add(10*time.Second-5*time.Millisecond), 5*time.Millisecond), // 10 / 0.005 (= 2000 samples) - createAggrChunkWithSineSamples(now.Add(10*time.Second), now.Add(10*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty - createAggrChunkWithSineSamples(now.Add(10*time.Second), now.Add(10*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty - createAggrChunkWithSineSamples(now.Add(10*time.Second), now.Add(20*time.Second-5*time.Millisecond), 5*time.Millisecond), // 10 / 0.005 (= 2000 samples, = 4000 in total) - createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(20*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty - createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(20*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty - createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(20*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty - createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(30*time.Second-5*time.Millisecond), 5*time.Millisecond), // 10 / 0.005 (= 2000 samples, = 6000 in total) - createAggrChunkWithSineSamples(now.Add(30*time.Second), now.Add(30*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "many_empty_chunks"), + Chunks: []storepb.AggrChunk{ + createAggrChunkWithSineSamples(now, now.Add(-5*time.Millisecond), 5*time.Millisecond), // empty + createAggrChunkWithSineSamples(now, now.Add(10*time.Second-5*time.Millisecond), 5*time.Millisecond), // 10 / 0.005 (= 2000 samples) + createAggrChunkWithSineSamples(now.Add(10*time.Second), now.Add(10*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty + createAggrChunkWithSineSamples(now.Add(10*time.Second), now.Add(10*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty + createAggrChunkWithSineSamples(now.Add(10*time.Second), now.Add(20*time.Second-5*time.Millisecond), 5*time.Millisecond), // 10 / 0.005 (= 2000 samples, = 4000 in total) + createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(20*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty + createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(20*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty + createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(20*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty + createAggrChunkWithSineSamples(now.Add(20*time.Second), now.Add(30*time.Second-5*time.Millisecond), 5*time.Millisecond), // 10 / 0.005 (= 2000 samples, = 6000 in total) + createAggrChunkWithSineSamples(now.Add(30*time.Second), now.Add(30*time.Second-5*time.Millisecond), 5*time.Millisecond), // empty + }, }, }, // Two adjacent ranges with overlapping chunks in each range. Each overlapping chunk in a // range have +1 sample at +1ms timestamp compared to the previous one. { - Labels: mkZLabels("__name__", "overlapping_chunks_with_additional_samples_in_sequence"), - Chunks: []storepb.AggrChunk{ - // Range #1: [now, now+4ms] - createAggrChunkWithSineSamples(now, now.Add(1*time.Millisecond), time.Millisecond), - createAggrChunkWithSineSamples(now, now.Add(2*time.Millisecond), time.Millisecond), - createAggrChunkWithSineSamples(now, now.Add(3*time.Millisecond), time.Millisecond), - createAggrChunkWithSineSamples(now, now.Add(4*time.Millisecond), time.Millisecond), - // Range #2: [now+5ms, now+7ms] - createAggrChunkWithSineSamples(now.Add(5*time.Millisecond), now.Add(5*time.Millisecond), time.Millisecond), - createAggrChunkWithSineSamples(now.Add(5*time.Millisecond), now.Add(6*time.Millisecond), time.Millisecond), - createAggrChunkWithSineSamples(now.Add(5*time.Millisecond), now.Add(7*time.Millisecond), time.Millisecond), + Series: &storepb.Series{ + Labels: mkZLabels("__name__", "overlapping_chunks_with_additional_samples_in_sequence"), + Chunks: []storepb.AggrChunk{ + // Range #1: [now, now+4ms] + createAggrChunkWithSineSamples(now, now.Add(1*time.Millisecond), time.Millisecond), + createAggrChunkWithSineSamples(now, now.Add(2*time.Millisecond), time.Millisecond), + createAggrChunkWithSineSamples(now, now.Add(3*time.Millisecond), time.Millisecond), + createAggrChunkWithSineSamples(now, now.Add(4*time.Millisecond), time.Millisecond), + // Range #2: [now+5ms, now+7ms] + createAggrChunkWithSineSamples(now.Add(5*time.Millisecond), now.Add(5*time.Millisecond), time.Millisecond), + createAggrChunkWithSineSamples(now.Add(5*time.Millisecond), now.Add(6*time.Millisecond), time.Millisecond), + createAggrChunkWithSineSamples(now.Add(5*time.Millisecond), now.Add(7*time.Millisecond), time.Millisecond), + }, }, }, }, @@ -558,7 +580,7 @@ func Benchmark_blockQuerierSeriesSet_iteration(b *testing.B) { ) // Generate series. - series := make([]*storepb.Series, 0, numSeries) + series := make([]*storepb.CustomSeries, 0, numSeries) for seriesID := 0; seriesID < numSeries; seriesID++ { lbls := mkZLabels("__name__", "test", "series_id", strconv.Itoa(seriesID)) chunks := make([]storepb.AggrChunk, 0, numChunksPerSeries) @@ -568,9 +590,11 @@ func Benchmark_blockQuerierSeriesSet_iteration(b *testing.B) { chunks = append(chunks, createAggrChunkWithSineSamples(util.TimeFromMillis(minT), util.TimeFromMillis(minT+numSamplesPerChunk), time.Millisecond)) } - series = append(series, &storepb.Series{ - Labels: lbls, - Chunks: chunks, + series = append(series, &storepb.CustomSeries{ + Series: &storepb.Series{ + Labels: lbls, + Chunks: chunks, + }, }) } @@ -597,7 +621,7 @@ func Benchmark_blockQuerierSeriesSet_seek(b *testing.B) { ) // Generate series. - series := make([]*storepb.Series, 0, numSeries) + series := make([]*storepb.CustomSeries, 0, numSeries) for seriesID := 0; seriesID < numSeries; seriesID++ { lbls := mkZLabels("__name__", "test", "series_id", strconv.Itoa(seriesID)) chunks := make([]storepb.AggrChunk, 0, numChunksPerSeries) @@ -607,9 +631,11 @@ func Benchmark_blockQuerierSeriesSet_seek(b *testing.B) { chunks = append(chunks, createAggrChunkWithSineSamples(util.TimeFromMillis(minT), util.TimeFromMillis(minT+numSamplesPerChunk), time.Millisecond)) } - series = append(series, &storepb.Series{ - Labels: lbls, - Chunks: chunks, + series = append(series, &storepb.CustomSeries{ + Series: &storepb.Series{ + Labels: lbls, + Chunks: chunks, + }, }) } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 64af7162aa8..ab87474af8d 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -780,7 +780,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor } // A storegateway client will only fill either of mySeries or myStreamingSeriesLabels, and not both. - mySeries := []*storepb.Series(nil) + mySeries := []*storepb.CustomSeries(nil) myStreamingSeriesLabels := []labels.Labels(nil) var myWarnings annotations.Annotations myQueriedBlocks := []ulid.ULID(nil) @@ -804,10 +804,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor break } if err != nil { + for _, s := range mySeries { + s.Release() + } return err } if shouldRetry { level.Warn(log).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err) + for _, s := range mySeries { + s.Release() + } return nil } @@ -926,7 +932,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor return seriesSets, queriedBlocks, warnings, startStreamingChunks, estimateChunks, nil //nolint:govet // It's OK to return without cancelling reqCtx, see comment above. } -func (q *blocksStoreQuerier) receiveMessage(c BlocksStoreClient, stream storegatewaypb.StoreGateway_SeriesClient, queryLimiter *limiter.QueryLimiter, mySeries []*storepb.Series, myWarnings annotations.Annotations, myQueriedBlocks []ulid.ULID, myStreamingSeriesLabels []labels.Labels, indexBytesFetched uint64) ([]*storepb.Series, annotations.Annotations, []ulid.ULID, []labels.Labels, uint64, bool, bool, error) { +func (q *blocksStoreQuerier) receiveMessage(c BlocksStoreClient, stream storegatewaypb.StoreGateway_SeriesClient, queryLimiter *limiter.QueryLimiter, mySeries []*storepb.CustomSeries, myWarnings annotations.Annotations, myQueriedBlocks []ulid.ULID, myStreamingSeriesLabels []labels.Labels, indexBytesFetched uint64) ([]*storepb.CustomSeries, annotations.Annotations, []ulid.ULID, []labels.Labels, uint64, bool, bool, error) { resp, err := stream.Recv() if err != nil { if errors.Is(err, io.EOF) { @@ -939,15 +945,9 @@ func (q *blocksStoreQuerier) receiveMessage(c BlocksStoreClient, stream storegat return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err } - defer resp.FreeBuffer() // Response may either contain series, streaming series, warning or hints. if s := resp.GetSeries(); s != nil { - // Take a safe copy of every label. - for i, l := range s.Labels { - s.Labels[i].Name = strings.Clone(l.Name) - s.Labels[i].Value = strings.Clone(l.Value) - } mySeries = append(mySeries, s) // Add series fingerprint to query limiter; will return error if we are over the limit @@ -993,16 +993,23 @@ func (q *blocksStoreQuerier) receiveMessage(c BlocksStoreClient, stream storegat if ss := resp.GetStreamingSeries(); ss != nil { myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series)) + var lb labels.ScratchBuilder for _, s := range ss.Series { - l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels) + // TODO: Use pooling enabled ScratchBuilder. + lb = labels.NewScratchBuilder(len(s.Labels)) + for _, l := range s.Labels { + lb.Add(l.Name, l.Value) + } + lbls := lb.Labels() // Add series fingerprint to query limiter; will return error if we are over the limit - if limitErr := queryLimiter.AddSeries(l); limitErr != nil { + if limitErr := queryLimiter.AddSeries(lbls); limitErr != nil { return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, limitErr } - myStreamingSeriesLabels = append(myStreamingSeriesLabels, l) + myStreamingSeriesLabels = append(myStreamingSeriesLabels, lbls) } + defer ss.Release() if ss.IsEndOfSeriesStream { return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, true, false, nil @@ -1295,7 +1302,7 @@ func convertBlockHintsToULIDs(hints []hintspb.Block) ([]ulid.ULID, error) { } // countChunksAndBytes returns the number of chunks and size of the chunks making up the provided series in bytes -func countChunksAndBytes(series ...*storepb.Series) (chunks, bytes int) { +func countChunksAndBytes(series ...*storepb.CustomSeries) (chunks, bytes int) { for _, s := range series { chunks += len(s.Chunks) for _, c := range s.Chunks { diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 1d1710cea56..30bc5aac874 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -3235,16 +3235,20 @@ func mockSeriesResponseWithFloatHistogramSamples(lbls labels.Labels, samples ... func mockSeriesResponseWithChunks(lbls labels.Labels, chunks ...storepb.AggrChunk) *storepb.SeriesResponse { return &storepb.SeriesResponse{ Result: &storepb.SeriesResponse_Series{ - Series: &storepb.Series{ - Labels: mimirpb.FromLabelsToLabelAdapters(lbls), - Chunks: chunks, + Series: &storepb.CustomSeries{ + Series: &storepb.Series{ + Labels: mimirpb.FromLabelsToLabelAdapters(lbls), + Chunks: chunks, + }, }, }, } } func mockStreamingSeriesBatchResponse(endOfStream bool, lbls ...[]mimirpb.LabelAdapter) *storepb.SeriesResponse { - res := &storepb.StreamingSeriesBatch{} + res := &storepb.CustomStreamingSeriesBatch{ + StreamingSeriesBatch: &storepb.StreamingSeriesBatch{}, + } for _, l := range lbls { res.Series = append(res.Series, &storepb.StreamingSeries{Labels: l}) } @@ -3259,11 +3263,13 @@ func mockStreamingSeriesBatchResponse(endOfStream bool, lbls ...[]mimirpb.LabelA func mockStreamingSeriesChunksResponse(index uint64, chks []storepb.AggrChunk) *storepb.SeriesResponse { return &storepb.SeriesResponse{ Result: &storepb.SeriesResponse_StreamingChunks{ - StreamingChunks: &storepb.StreamingChunksBatch{ - Series: []*storepb.StreamingChunks{ - { - SeriesIndex: index, - Chunks: chks, + StreamingChunks: &storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: []*storepb.StreamingChunks{ + { + SeriesIndex: index, + Chunks: chks, + }, }, }, }, diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 1fefa6a26da..735deb8482e 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -772,8 +772,10 @@ func (s *BucketStore) sendStreamingSeriesLabelsAndStats( for i := range seriesBuffer { seriesBuffer[i] = &storepb.StreamingSeries{} } - seriesBatch := &storepb.StreamingSeriesBatch{ - Series: seriesBuffer[:0], + seriesBatch := storepb.CustomStreamingSeriesBatch{ + StreamingSeriesBatch: &storepb.StreamingSeriesBatch{ + Series: seriesBuffer[:0], + }, } // TODO: can we send this in parallel while we start fetching the chunks below? for seriesSet.Next() { @@ -839,7 +841,11 @@ func (s *BucketStore) sendStreamingChunks( chunksBuffer[i] = &storepb.StreamingChunks{} } haveSentEstimatedChunks := false - chunksBatch := &storepb.StreamingChunksBatch{Series: chunksBuffer[:0]} + chunksBatch := storepb.CustomStreamingChunksBatch{ + StreamingChunksBatch: &storepb.StreamingChunksBatch{ + Series: chunksBuffer[:0], + }, + } for it.Next() { set := it.At() @@ -944,8 +950,10 @@ func (s *BucketStore) sendSeriesChunks( // because the subsequent call to seriesSet.Next() may release it. But it is safe to hold // onto lset because the labels are not released. lset, chks := seriesSet.At() - series := storepb.Series{ - Labels: mimirpb.FromLabelsToLabelAdapters(lset), + series := storepb.CustomSeries{ + Series: &storepb.Series{ + Labels: mimirpb.FromLabelsToLabelAdapters(lset), + }, } if !req.SkipChunks { series.Chunks = chks @@ -953,7 +961,7 @@ func (s *BucketStore) sendSeriesChunks( s.metrics.chunkSizeBytes.Observe(float64(chunksSize(chks))) } - err := s.sendMessage("series", srv, storepb.NewSeriesResponse(&series), &encodeDuration, &sendDuration) + err := s.sendMessage("series", srv, storepb.NewSeriesResponse(series), &encodeDuration, &sendDuration) if err != nil { return err } diff --git a/pkg/storegateway/bucket_store_server_test.go b/pkg/storegateway/bucket_store_server_test.go index a96def26ef9..56408fe00ab 100644 --- a/pkg/storegateway/bucket_store_server_test.go +++ b/pkg/storegateway/bucket_store_server_test.go @@ -66,7 +66,7 @@ func newStoreGatewayTestServer(t testing.TB, store storegatewaypb.StoreGatewaySe // Series calls the store server's Series() endpoint via gRPC and returns the responses collected // via the gRPC stream. -func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest) (seriesSet []*storepb.Series, warnings annotations.Annotations, hints hintspb.SeriesResponseHints, estimatedChunks uint64, err error) { +func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest) (seriesSet []*storepb.CustomSeries, warnings annotations.Annotations, hints hintspb.SeriesResponseHints, estimatedChunks uint64, err error) { var ( conn *grpc.ClientConn stream storegatewaypb.StoreGateway_SeriesClient @@ -133,7 +133,7 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest return } - copiedSeries := &storepb.Series{} + copiedSeries := &storepb.CustomSeries{} if err = copiedSeries.Unmarshal(recvSeriesData); err != nil { err = errors.Wrap(err, "unmarshal received series") return @@ -232,9 +232,11 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest return } - seriesSet = append(seriesSet, &storepb.Series{ - Labels: streamingSeriesSet[idx].Labels, - Chunks: copiedChunks.Chunks, + seriesSet = append(seriesSet, &storepb.CustomSeries{ + Series: &storepb.Series{ + Labels: streamingSeriesSet[idx].Labels, + Chunks: copiedChunks.Chunks, + }, }) } } diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 60523731fe6..0deebecf051 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -607,7 +607,7 @@ func TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks(t *testing.T) { nextSeriesIdx := 0 if testData.expectedSamplesForOutOfOrderChunks != nil { - assert.Equal(t, seriesWithOutOfOrderChunks, promLabels(seriesSet[nextSeriesIdx])) + assert.Equal(t, seriesWithOutOfOrderChunks, promLabels(seriesSet[nextSeriesIdx].Series)) samples, err := readSamplesFromChunks(seriesSet[nextSeriesIdx].Chunks) require.NoError(t, err) @@ -617,7 +617,7 @@ func TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks(t *testing.T) { } if testData.expectedSamplesForOverlappingChunks != nil { - assert.Equal(t, seriesWithOverlappingChunks, promLabels(seriesSet[nextSeriesIdx])) + assert.Equal(t, seriesWithOverlappingChunks, promLabels(seriesSet[nextSeriesIdx].Series)) samples, err := readSamplesFromChunks(seriesSet[nextSeriesIdx].Chunks) require.NoError(t, err) @@ -671,7 +671,7 @@ func generateStorageBlock(t *testing.T, storageDir, userID string, metricName st require.NoError(t, db.Snapshot(userDir, true)) } -func querySeries(t *testing.T, stores *BucketStores, userID, metricName string, minT, maxT int64) ([]*storepb.Series, annotations.Annotations, error) { +func querySeries(t *testing.T, stores *BucketStores, userID, metricName string, minT, maxT int64) ([]*storepb.CustomSeries, annotations.Annotations, error) { req := &storepb.SeriesRequest{ MinTime: minT, MaxTime: maxT, diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index c2afc4709b1..bf6aa8476bd 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1382,7 +1382,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries var ( logger = log.NewNopLogger() - series []*storepb.Series + series []*storepb.CustomSeries expectedQueriesBlocks []hintspb.Block random = rand.New(rand.NewSource(120)) ) @@ -1584,7 +1584,7 @@ func TestBucketStore_Series_Concurrency(t *testing.T) { var ( ctx = context.Background() logger = log.NewNopLogger() - expectedSeries []*storepb.Series + expectedSeries []*storepb.CustomSeries expectedBlockIDs []string random = rand.New(rand.NewSource(120)) tmpDir = t.TempDir() @@ -1909,7 +1909,7 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) { } func TestBucketStore_Series_RequestAndResponseHints(t *testing.T) { - newTestCases := func(seriesSet1 []*storepb.Series, seriesSet2 []*storepb.Series, block1 ulid.ULID, block2 ulid.ULID) []*seriesCase { + newTestCases := func(seriesSet1 []*storepb.CustomSeries, seriesSet2 []*storepb.CustomSeries, block1 ulid.ULID, block2 ulid.ULID) []*seriesCase { return []*seriesCase{ { Name: "querying a range containing 1 block should return 1 block in the response hints", @@ -1935,7 +1935,7 @@ func TestBucketStore_Series_RequestAndResponseHints(t *testing.T) { {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, }, }, - ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...), + ExpectedSeries: append(append([]*storepb.CustomSeries{}, seriesSet1...), seriesSet2...), ExpectedHints: hintspb.SeriesResponseHints{ QueriedBlocks: []hintspb.Block{ {Id: block1.String()}, @@ -2560,7 +2560,7 @@ func mustMarshalAny(pb proto.Message) *types.Any { return out } -func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketStoreOption) (test.TB, *BucketStore, []*storepb.Series, []*storepb.Series, ulid.ULID, ulid.ULID, func()) { +func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketStoreOption) (test.TB, *BucketStore, []*storepb.CustomSeries, []*storepb.CustomSeries, ulid.ULID, ulid.ULID, func()) { tb := test.NewTB(t) cleanupFuncs := []func(){} @@ -2705,7 +2705,7 @@ func TestLabelNamesAndValuesHints(t *testing.T) { End: 3, }, expectedNames: labelNamesFromSeriesSet( - append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...), + append(append([]*storepb.CustomSeries{}, seriesSet1...), seriesSet2...), ), expectedNamesHints: hintspb.LabelNamesResponseHints{ QueriedBlocks: []hintspb.Block{ @@ -2847,7 +2847,7 @@ func TestLabelValues_Cancelled(t *testing.T) { assert.Equal(t, codes.Canceled, s.Code()) } -func labelNamesFromSeriesSet(series []*storepb.Series) []string { +func labelNamesFromSeriesSet(series []*storepb.CustomSeries) []string { labelsMap := map[string]struct{}{} for _, s := range series { @@ -2881,7 +2881,7 @@ type headGenOptions struct { // Each series looks as follows: // {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} where number indicate sample number from 0. // Returned series are framed in the same way as remote read would frame them. -func createHeadWithSeries(t testing.TB, j int, opts headGenOptions) (*tsdb.Head, []*storepb.Series) { +func createHeadWithSeries(t testing.TB, j int, opts headGenOptions) (*tsdb.Head, []*storepb.CustomSeries) { if opts.SamplesPerSeries < 1 || opts.Series < 1 { t.Fatal("samples and series has to be 1 or more") } @@ -2946,14 +2946,18 @@ func createHeadWithSeries(t testing.TB, j int, opts headGenOptions) (*tsdb.Head, var ( chunkMetas []chunks.Meta - expected = make([]*storepb.Series, 0, opts.Series) + expected = make([]*storepb.CustomSeries, 0, opts.Series) ) var builder labels.ScratchBuilder all := allPostings(t, ir) for all.Next() { assert.NoError(t, ir.Series(all.At(), &builder, &chunkMetas)) - expected = append(expected, &storepb.Series{Labels: mimirpb.FromLabelsToLabelAdapters(builder.Labels())}) + expected = append(expected, &storepb.CustomSeries{ + Series: &storepb.Series{ + Labels: mimirpb.FromLabelsToLabelAdapters(builder.Labels()), + }, + }) if opts.SkipChunks { continue @@ -3013,7 +3017,7 @@ type seriesCase struct { Req *storepb.SeriesRequest // Exact expectations are checked only for tests. For benchmarks only length is assured. - ExpectedSeries []*storepb.Series + ExpectedSeries []*storepb.CustomSeries ExpectedWarnings []string ExpectedHints hintspb.SeriesResponseHints } diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index c91693f5685..87f3e208599 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -1185,7 +1185,7 @@ func TestStoreGateway_Series_QuerySharding(t *testing.T) { actualMetrics := make([]string, 0, len(seriesSet)) for _, s := range seriesSet { - actualMetrics = append(actualMetrics, promLabels(s).Get(labels.MetricName)) + actualMetrics = append(actualMetrics, promLabels(s.Series).Get(labels.MetricName)) } assert.ElementsMatch(t, testData.expectedMetrics, actualMetrics) }) @@ -1333,7 +1333,7 @@ func TestStoreGateway_Series_QueryShardingConcurrency(t *testing.T) { t.Run(fmt.Sprintf("streamingBatchSize=%d", streamingBatchSize), func(t *testing.T) { // Keep track of all responses received (by shard). responsesMx := sync.Mutex{} - responses := make(map[int][][]*storepb.Series) + responses := make(map[int][][]*storepb.CustomSeries) wg := sync.WaitGroup{} wg.Add(numQueries) @@ -1373,7 +1373,7 @@ func TestStoreGateway_Series_QueryShardingConcurrency(t *testing.T) { totalSeries := 0 for shardIndex := 0; shardIndex < shardCount; shardIndex++ { - var expected []*storepb.Series + var expected []*storepb.CustomSeries for resIdx, res := range responses[shardIndex] { // We consider the 1st response for a shard as the expected one diff --git a/pkg/storegateway/storepb/custom.go b/pkg/storegateway/storepb/custom.go index 8bc582c56be..97cfc7e1f6b 100644 --- a/pkg/storegateway/storepb/custom.go +++ b/pkg/storegateway/storepb/custom.go @@ -6,17 +6,23 @@ package storepb import ( + "fmt" + "io" + "slices" + "sync" + "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/chunk" ) -func NewSeriesResponse(series *Series) *SeriesResponse { +func NewSeriesResponse(series CustomSeries) *SeriesResponse { return &SeriesResponse{ Result: &SeriesResponse_Series{ - Series: series, + Series: &series, }, } } @@ -37,18 +43,18 @@ func NewStatsResponse(indexBytesFetched int) *SeriesResponse { } } -func NewStreamingSeriesResponse(series *StreamingSeriesBatch) *SeriesResponse { +func NewStreamingSeriesResponse(series CustomStreamingSeriesBatch) *SeriesResponse { return &SeriesResponse{ Result: &SeriesResponse_StreamingSeries{ - StreamingSeries: series, + StreamingSeries: &series, }, } } -func NewStreamingChunksResponse(series *StreamingChunksBatch) *SeriesResponse { +func NewStreamingChunksResponse(series CustomStreamingChunksBatch) *SeriesResponse { return &SeriesResponse{ Result: &SeriesResponse_StreamingChunks{ - StreamingChunks: series, + StreamingChunks: &series, }, } } @@ -146,3 +152,1101 @@ func (c AggrChunk) GetChunkEncoding() (chunk.Encoding, bool) { return 0, false } } + +var ( + seriesPool = sync.Pool{ + New: func() any { + return &Series{} + }, + } + chunkDataPool = sync.Pool{ + New: func() any { + return mimirpb.UnsafeByteSlice{} + }, + } + streamingSeriesBatchPool = sync.Pool{ + New: func() any { + return &StreamingSeriesBatch{} + }, + } + streamingChunksBatchPool = sync.Pool{ + New: func() any { + return &StreamingChunksBatch{} + }, + } + labelAdaptersPool = sync.Pool{ + New: func() any { + return []mimirpb.LabelAdapter{} + }, + } +) + +type CustomSeries struct { + *Series +} + +// Release back to pool. +func (m *CustomSeries) Release() { + for _, chk := range m.Chunks { + //nolint:staticcheck + chunkDataPool.Put(chk.Raw.Data) + chk.Raw.Data = nil + } + m.Labels = m.Labels[:0] + m.Chunks = m.Chunks[:0] + seriesPool.Put(m.Series) + m.Series = nil +} + +func (m *CustomSeries) Unmarshal(data []byte) error { + m.Series = seriesPool.Get().(*Series) + m.Labels = m.Labels[:0] + m.Chunks = m.Chunks[:0] + l := len(data) + index := 0 + for index < l { + preIndex := index + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Series: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Series: illegal tag %d (wire type %d)", fieldNum, wire) + } + + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + + b := data[index] + index++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + + postIndex := index + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + + var la mimirpb.LabelAdapter + if err := unmarshalLabelAdapter(&la, data[index:postIndex]); err != nil { + return err + } + m.Labels = append(m.Labels, la) + index = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := index + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var chk AggrChunk + if err := unmarshalAggrChunk(&chk, data[index:postIndex]); err != nil { + return err + } + m.Chunks = append(m.Chunks, chk) + index = postIndex + default: + index = preIndex + skippy, err := skipTypes(data[index:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + index += skippy + } + } + + if index > l { + return io.ErrUnexpectedEOF + } + return nil +} + +func (m *SeriesResponse) GetSeries() *CustomSeries { + if x, ok := m.GetResult().(*SeriesResponse_Series); ok { + return x.Series + } + return nil +} + +func (m *SeriesResponse) GetStreamingChunks() *CustomStreamingChunksBatch { + if x, ok := m.GetResult().(*SeriesResponse_StreamingChunks); ok { + return x.StreamingChunks + } + return nil +} + +func (m *SeriesResponse) GetStreamingSeries() *CustomStreamingSeriesBatch { + if x, ok := m.GetResult().(*SeriesResponse_StreamingSeries); ok { + return x.StreamingSeries + } + return nil +} + +func unmarshalLabelAdapter(la *mimirpb.LabelAdapter, data []byte) error { + l := len(data) + index := 0 + for index < l { + preIndex := index + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return mimirpb.ErrIntOverflowMimir + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelPair: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelPair: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return mimirpb.ErrIntOverflowMimir + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return mimirpb.ErrInvalidLengthMimir + } + postIndex := index + byteLen + if postIndex < 0 { + return mimirpb.ErrInvalidLengthMimir + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + // TODO: Consider using a pool: Get byte slice from pool, copy the data to it, and take a yoloString. + la.Name = string(data[index:postIndex]) + index = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return mimirpb.ErrIntOverflowMimir + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return mimirpb.ErrInvalidLengthMimir + } + postIndex := index + byteLen + if postIndex < 0 { + return mimirpb.ErrInvalidLengthMimir + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + // TODO: Consider using a pool: Get byte slice from pool, copy the data to it, and take a yoloString. + la.Value = string(data[index:postIndex]) + index = postIndex + default: + index = preIndex + skippy, err := skipMimir(data[index:]) + if err != nil { + return err + } + if skippy < 0 { + return mimirpb.ErrInvalidLengthMimir + } + if (index + skippy) < 0 { + return mimirpb.ErrInvalidLengthMimir + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + index += skippy + } + } + if index > l { + return io.ErrUnexpectedEOF + } + + return nil +} + +func skipMimir(data []byte) (n int, err error) { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, mimirpb.ErrIntOverflowMimir + } + if index >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, mimirpb.ErrIntOverflowMimir + } + if index >= l { + return 0, io.ErrUnexpectedEOF + } + index++ + if data[index-1] < 0x80 { + break + } + } + return index, nil + case 1: + index += 8 + return index, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, mimirpb.ErrIntOverflowMimir + } + if index >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[index] + index++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, mimirpb.ErrInvalidLengthMimir + } + index += length + if index < 0 { + return 0, mimirpb.ErrInvalidLengthMimir + } + return index, nil + case 3: + for { + var innerWire uint64 + start := index + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, mimirpb.ErrIntOverflowMimir + } + if index >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[index] + index++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipMimir(data[start:]) + if err != nil { + return 0, err + } + index = start + next + if index < 0 { + return 0, mimirpb.ErrInvalidLengthMimir + } + } + return index, nil + case 4: + return index, nil + case 5: + index += 4 + return index, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +func unmarshalAggrChunk(chk *AggrChunk, data []byte) error { + l := len(data) + index := 0 + for index < l { + preIndex := index + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AggrChunk: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AggrChunk: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MinTime", wireType) + } + chk.MinTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + chk.MinTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxTime", wireType) + } + chk.MaxTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + chk.MaxTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Raw", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := index + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := unmarshalChunk(&chk.Raw, data[index:postIndex]); err != nil { + return err + } + index = postIndex + default: + index = preIndex + skippy, err := skipTypes(data[index:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + index += skippy + } + } + + if index > l { + return io.ErrUnexpectedEOF + } + return nil +} + +func unmarshalChunk(chk *Chunk, data []byte) error { + l := len(data) + index := 0 + for index < l { + preIndex := index + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Chunk: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Chunk: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + chk.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + chk.Type |= Chunk_Encoding(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthTypes + } + postIndex := index + byteLen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + l := postIndex - index + chk.Data = slices.Grow(chunkDataPool.Get().(mimirpb.UnsafeByteSlice)[:0], l)[0:l] + copy(chk.Data, data[index:postIndex]) + index = postIndex + default: + index = preIndex + skippy, err := skipTypes(data[index:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + index += skippy + } + } + + if index > l { + return io.ErrUnexpectedEOF + } + return nil +} + +type CustomStreamingSeriesBatch struct { + *StreamingSeriesBatch +} + +// Release back to pool. +func (m *CustomStreamingSeriesBatch) Release() { + for _, s := range m.Series { + //nolint:staticcheck + labelAdaptersPool.Put(s.Labels) + } + streamingSeriesBatchPool.Put(m.StreamingSeriesBatch) + m.StreamingSeriesBatch = nil +} + +func (m *CustomStreamingSeriesBatch) Unmarshal(data []byte) error { + m.StreamingSeriesBatch = streamingSeriesBatchPool.Get().(*StreamingSeriesBatch) + m.Series = m.Series[:0] + + l := len(data) + index := 0 + for index < l { + preIndex := index + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamingSeriesBatch: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamingSeriesBatch: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Series", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := index + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var ss StreamingSeries + if err := unmarshalStreamingSeries(&ss, data[index:postIndex]); err != nil { + return err + } + m.Series = append(m.Series, &ss) + index = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IsEndOfSeriesStream", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.IsEndOfSeriesStream = bool(v != 0) + default: + index = preIndex + skippy, err := skipTypes(data[index:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + index += skippy + } + } + + if index > l { + return io.ErrUnexpectedEOF + } + return nil +} + +func unmarshalStreamingSeries(m *StreamingSeries, data []byte) error { + m.Labels = labelAdaptersPool.Get().([]mimirpb.LabelAdapter)[:0] + + l := len(data) + index := 0 + for index < l { + preIndex := index + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamingSeries: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamingSeries: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := index + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var la mimirpb.LabelAdapter + if err := unmarshalLabelAdapter(&la, data[index:postIndex]); err != nil { + return err + } + m.Labels = append(m.Labels, la) + index = postIndex + default: + index = preIndex + skippy, err := skipTypes(data[index:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + index += skippy + } + } + + if index > l { + return io.ErrUnexpectedEOF + } + return nil +} + +type CustomStreamingChunksBatch struct { + *StreamingChunksBatch +} + +// Release batch back to pool. +// Contained chunks are also released back to pool. +func (m *CustomStreamingChunksBatch) Release() { + for _, chks := range m.Series { + for _, chk := range chks.Chunks { + ReleaseChunk(&chk.Raw) + } + chks.Chunks = chks.Chunks[:0] + } + + m.Series = m.Series[:0] + streamingChunksBatchPool.Put(m.StreamingChunksBatch) + m.StreamingChunksBatch = nil +} + +// ReleaseChunk releases chk back to the pool. +func ReleaseChunk(chk *Chunk) { + chk.Data = chk.Data[:0] + //nolint:staticcheck + chunkDataPool.Put(chk.Data) + chk.Data = nil +} + +func (m *CustomStreamingChunksBatch) Unmarshal(data []byte) error { + m.StreamingChunksBatch = streamingChunksBatchPool.Get().(*StreamingChunksBatch) + m.Series = m.Series[:0] + + l := len(data) + index := 0 + for index < l { + preIndex := index + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamingChunksBatch: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamingChunksBatch: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Series", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := index + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var ss StreamingChunks + if err := unmarshalStreamingChunks(&ss, data[index:postIndex]); err != nil { + return err + } + m.Series = append(m.Series, &ss) + index = postIndex + default: + index = preIndex + skippy, err := skipTypes(data[index:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + index += skippy + } + } + + if index > l { + return io.ErrUnexpectedEOF + } + return nil +} + +func unmarshalStreamingChunks(m *StreamingChunks, data []byte) error { + l := len(data) + index := 0 + for index < l { + preIndex := index + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamingChunks: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamingChunks: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SeriesIndex", wireType) + } + m.SeriesIndex = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.SeriesIndex |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := index + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var chk AggrChunk + if err := unmarshalAggrChunk(&chk, data[index:postIndex]); err != nil { + return nil + } + m.Chunks = append(m.Chunks, chk) + index = postIndex + default: + index = preIndex + skippy, err := skipTypes(data[index:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + index += skippy + } + } + + if index > l { + return io.ErrUnexpectedEOF + } + return nil +} diff --git a/pkg/storegateway/storepb/rpc.pb.go b/pkg/storegateway/storepb/rpc.pb.go index 23cdf0292ad..939ee32358f 100644 --- a/pkg/storegateway/storepb/rpc.pb.go +++ b/pkg/storegateway/storepb/rpc.pb.go @@ -8,7 +8,6 @@ import ( _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" types "github.com/gogo/protobuf/types" - "github.com/grafana/mimir/pkg/mimirpb" io "io" math "math" math_bits "math/bits" @@ -121,9 +120,6 @@ func (m *Stats) XXX_DiscardUnknown() { var xxx_messageInfo_Stats proto.InternalMessageInfo type SeriesResponse struct { - // Keep reference to buffer for unsafe references. - mimirpb.BufferHolder - // Types that are valid to be assigned to Result: // *SeriesResponse_Series // *SeriesResponse_Warning @@ -175,7 +171,7 @@ type isSeriesResponse_Result interface { } type SeriesResponse_Series struct { - Series *Series `protobuf:"bytes,1,opt,name=series,proto3,oneof"` + Series *CustomSeries `protobuf:"bytes,1,opt,name=series,proto3,oneof,customtype=CustomSeries"` } type SeriesResponse_Warning struct { Warning string `protobuf:"bytes,2,opt,name=warning,proto3,oneof"` @@ -187,10 +183,10 @@ type SeriesResponse_Stats struct { Stats *Stats `protobuf:"bytes,4,opt,name=stats,proto3,oneof"` } type SeriesResponse_StreamingSeries struct { - StreamingSeries *StreamingSeriesBatch `protobuf:"bytes,5,opt,name=streaming_series,json=streamingSeries,proto3,oneof"` + StreamingSeries *CustomStreamingSeriesBatch `protobuf:"bytes,5,opt,name=streaming_series,json=streamingSeries,proto3,oneof,customtype=CustomStreamingSeriesBatch"` } type SeriesResponse_StreamingChunks struct { - StreamingChunks *StreamingChunksBatch `protobuf:"bytes,6,opt,name=streaming_chunks,json=streamingChunks,proto3,oneof"` + StreamingChunks *CustomStreamingChunksBatch `protobuf:"bytes,6,opt,name=streaming_chunks,json=streamingChunks,proto3,oneof,customtype=CustomStreamingChunksBatch"` } type SeriesResponse_StreamingChunksEstimate struct { StreamingChunksEstimate *StreamingChunksEstimate `protobuf:"bytes,7,opt,name=streaming_chunks_estimate,json=streamingChunksEstimate,proto3,oneof"` @@ -211,13 +207,6 @@ func (m *SeriesResponse) GetResult() isSeriesResponse_Result { return nil } -func (m *SeriesResponse) GetSeries() *Series { - if x, ok := m.GetResult().(*SeriesResponse_Series); ok { - return x.Series - } - return nil -} - func (m *SeriesResponse) GetWarning() string { if x, ok := m.GetResult().(*SeriesResponse_Warning); ok { return x.Warning @@ -239,20 +228,6 @@ func (m *SeriesResponse) GetStats() *Stats { return nil } -func (m *SeriesResponse) GetStreamingSeries() *StreamingSeriesBatch { - if x, ok := m.GetResult().(*SeriesResponse_StreamingSeries); ok { - return x.StreamingSeries - } - return nil -} - -func (m *SeriesResponse) GetStreamingChunks() *StreamingChunksBatch { - if x, ok := m.GetResult().(*SeriesResponse_StreamingChunks); ok { - return x.StreamingChunks - } - return nil -} - func (m *SeriesResponse) GetStreamingChunksEstimate() *StreamingChunksEstimate { if x, ok := m.GetResult().(*SeriesResponse_StreamingChunksEstimate); ok { return x.StreamingChunksEstimate @@ -453,54 +428,55 @@ func init() { func init() { proto.RegisterFile("rpc.proto", fileDescriptor_77a6da22d6a3feb1) } var fileDescriptor_77a6da22d6a3feb1 = []byte{ - // 740 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x93, 0xc1, 0x6e, 0x13, 0x3b, - 0x14, 0x86, 0xc7, 0x19, 0xcf, 0xc4, 0x71, 0x9a, 0xde, 0xe9, 0xb4, 0xba, 0x77, 0x9a, 0x7b, 0x35, - 0x8d, 0x22, 0x5d, 0x29, 0x42, 0x90, 0x4a, 0x45, 0x82, 0x15, 0x8b, 0x06, 0x21, 0xa5, 0x23, 0x60, - 0xe1, 0x22, 0x16, 0x48, 0x28, 0x9a, 0x24, 0x6e, 0x62, 0x35, 0xe3, 0x09, 0x63, 0x07, 0x9a, 0xae, - 0x78, 0x04, 0x1e, 0x03, 0xc1, 0x13, 0xb0, 0x65, 0xd5, 0x65, 0x97, 0x5d, 0x21, 0x92, 0x6e, 0x58, - 0xf6, 0x11, 0xd0, 0x78, 0x9c, 0xa4, 0x11, 0xad, 0x4a, 0x25, 0x76, 0x3e, 0xff, 0x7f, 0x7c, 0x7c, - 0xfc, 0xf9, 0x18, 0x17, 0x92, 0x61, 0xa7, 0x3e, 0x4c, 0x62, 0x19, 0xbb, 0xb6, 0xec, 0x87, 0x3c, - 0x16, 0xe5, 0xa2, 0x1c, 0x0f, 0xa9, 0xc8, 0xc4, 0xf2, 0xbd, 0x1e, 0x93, 0xfd, 0x51, 0xbb, 0xde, - 0x89, 0xa3, 0xed, 0x5e, 0xdc, 0x8b, 0xb7, 0x95, 0xdc, 0x1e, 0x1d, 0xa8, 0x48, 0x05, 0x6a, 0xa5, - 0xd3, 0x37, 0x7b, 0x71, 0xdc, 0x1b, 0xd0, 0x45, 0x56, 0xc8, 0xc7, 0x99, 0x55, 0xfd, 0x92, 0xc3, - 0xa5, 0x7d, 0x9a, 0x30, 0x2a, 0x08, 0x7d, 0x33, 0xa2, 0x42, 0xba, 0x9b, 0x18, 0x45, 0x8c, 0xb7, - 0x24, 0x8b, 0xa8, 0x07, 0x2a, 0xa0, 0x66, 0x92, 0x7c, 0xc4, 0xf8, 0x0b, 0x16, 0x51, 0x65, 0x85, - 0x47, 0x99, 0x95, 0xd3, 0x56, 0x78, 0xa4, 0xac, 0x07, 0xa9, 0x25, 0x3b, 0x7d, 0x9a, 0x08, 0xcf, - 0xac, 0x98, 0xb5, 0xe2, 0xce, 0x46, 0x3d, 0xeb, 0xbc, 0xfe, 0x34, 0x6c, 0xd3, 0xc1, 0xb3, 0xcc, - 0x6c, 0xc0, 0x93, 0x6f, 0x5b, 0x06, 0x99, 0xe7, 0xba, 0x5b, 0xb8, 0x28, 0x0e, 0xd9, 0xb0, 0xd5, - 0xe9, 0x8f, 0xf8, 0xa1, 0xf0, 0x50, 0x05, 0xd4, 0x10, 0xc1, 0xa9, 0xf4, 0x58, 0x29, 0xee, 0x1d, - 0x6c, 0xf5, 0x19, 0x97, 0xc2, 0x2b, 0x54, 0x80, 0xaa, 0x9a, 0xdd, 0xa5, 0x3e, 0xbb, 0x4b, 0x7d, - 0x97, 0x8f, 0x49, 0x96, 0xe2, 0x3e, 0xc2, 0xff, 0x0a, 0x99, 0xd0, 0x30, 0x62, 0xbc, 0xa7, 0x2b, - 0xb6, 0xda, 0xe9, 0x49, 0x2d, 0xc1, 0x8e, 0xa9, 0xd7, 0xad, 0x80, 0x1a, 0x24, 0xde, 0x3c, 0x25, - 0x3b, 0xa1, 0x91, 0x26, 0xec, 0xb3, 0x63, 0x1a, 0x40, 0x04, 0x1d, 0x2b, 0x80, 0xc8, 0x72, 0xec, - 0x00, 0x22, 0xdb, 0xc9, 0x07, 0x10, 0xe5, 0x1d, 0x14, 0x40, 0x84, 0x9d, 0x62, 0x00, 0x51, 0xd1, - 0x59, 0x09, 0x20, 0x5a, 0x71, 0x4a, 0x01, 0x44, 0x25, 0x67, 0xb5, 0xfa, 0x10, 0x5b, 0xfb, 0x32, - 0x94, 0xc2, 0xad, 0xe3, 0xf5, 0x03, 0x9a, 0x5e, 0xa8, 0xdb, 0x62, 0xbc, 0x4b, 0x8f, 0x5a, 0xed, - 0xb1, 0xa4, 0x42, 0xd1, 0x83, 0x64, 0x4d, 0x5b, 0x7b, 0xa9, 0xd3, 0x48, 0x8d, 0xea, 0x27, 0x13, - 0xaf, 0xce, 0xa0, 0x8b, 0x61, 0xcc, 0x05, 0x75, 0x6b, 0xd8, 0x16, 0x4a, 0x51, 0xbb, 0x8a, 0x3b, - 0xab, 0x33, 0x7a, 0x59, 0x5e, 0xd3, 0x20, 0xda, 0x77, 0xcb, 0x38, 0xff, 0x2e, 0x4c, 0x38, 0xe3, - 0x3d, 0xf5, 0x06, 0x85, 0xa6, 0x41, 0x66, 0x82, 0x7b, 0x77, 0x06, 0xcb, 0xbc, 0x1e, 0x56, 0xd3, - 0x98, 0xe1, 0xfa, 0x1f, 0x5b, 0x22, 0xed, 0xdf, 0x83, 0x2a, 0xbb, 0x34, 0x3f, 0x32, 0x15, 0xd3, - 0x34, 0xe5, 0xba, 0x7b, 0xd8, 0x59, 0x50, 0xd5, 0x4d, 0x5a, 0x6a, 0xc7, 0x7f, 0x8b, 0x1d, 0xda, - 0xcf, 0xba, 0x55, 0x48, 0x9b, 0x06, 0xf9, 0x4b, 0x2c, 0xeb, 0xcb, 0xa5, 0xf4, 0x93, 0xdb, 0xd7, - 0x94, 0xba, 0xf4, 0x3a, 0x4b, 0xa5, 0xf4, 0x5c, 0xbc, 0xc6, 0x9b, 0xbf, 0xbc, 0x35, 0x15, 0x92, - 0x45, 0xa1, 0xa4, 0x5e, 0x5e, 0xd5, 0xdc, 0xba, 0xa6, 0xe6, 0x13, 0x9d, 0xd6, 0x34, 0xc8, 0x3f, - 0xe2, 0x6a, 0xab, 0x81, 0xb0, 0x9d, 0x50, 0x31, 0x1a, 0xc8, 0xea, 0x67, 0x80, 0xd7, 0xd4, 0x08, - 0x3f, 0x0f, 0xa3, 0xc5, 0x2f, 0xd9, 0x50, 0xec, 0x12, 0xa9, 0x48, 0x9b, 0x24, 0x0b, 0x5c, 0x07, - 0x9b, 0x94, 0x77, 0x15, 0x4f, 0x93, 0xa4, 0xcb, 0xc5, 0xf8, 0x5a, 0x37, 0x8f, 0xef, 0xe5, 0x3f, - 0x64, 0xff, 0xfe, 0x1f, 0x0a, 0x20, 0x02, 0x4e, 0x2e, 0x80, 0x28, 0xe7, 0x98, 0xd5, 0x04, 0xbb, - 0x97, 0x9b, 0xd5, 0xd3, 0xb5, 0x81, 0x2d, 0x9e, 0x0a, 0x1e, 0xa8, 0x98, 0xb5, 0x02, 0xc9, 0x02, - 0xb7, 0x8c, 0x91, 0x1e, 0x1c, 0xe1, 0xe5, 0x94, 0x31, 0x8f, 0x17, 0x7d, 0x9b, 0x37, 0xf6, 0x5d, - 0xfd, 0x0a, 0xf4, 0xa1, 0x2f, 0xc3, 0xc1, 0x68, 0x09, 0xd1, 0x20, 0x55, 0xd5, 0x44, 0x17, 0x48, - 0x16, 0x2c, 0xc0, 0xc1, 0x2b, 0xc0, 0x59, 0x57, 0x80, 0xb3, 0x6f, 0x07, 0x2e, 0x7f, 0x2b, 0x70, - 0x39, 0xc7, 0x0c, 0x20, 0x32, 0x1d, 0x58, 0x1d, 0xe1, 0xf5, 0xa5, 0x3b, 0x68, 0x72, 0x7f, 0x63, - 0xfb, 0xad, 0x52, 0x34, 0x3a, 0x1d, 0xfd, 0x29, 0x76, 0x8d, 0xdd, 0x93, 0x89, 0x6f, 0x9c, 0x4e, - 0x7c, 0xe3, 0x6c, 0xe2, 0x1b, 0x17, 0x13, 0x1f, 0xbc, 0x9f, 0xfa, 0xe0, 0xe3, 0xd4, 0x07, 0x27, - 0x53, 0x1f, 0x9c, 0x4e, 0x7d, 0xf0, 0x7d, 0xea, 0x83, 0x1f, 0x53, 0xdf, 0xb8, 0x98, 0xfa, 0xe0, - 0xc3, 0xb9, 0x6f, 0x9c, 0x9e, 0xfb, 0xc6, 0xd9, 0xb9, 0x6f, 0xbc, 0xca, 0x0b, 0x19, 0x27, 0x74, - 0xd8, 0x6e, 0xdb, 0xaa, 0xee, 0xfd, 0x9f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xfa, 0xb6, 0x7d, 0x6f, - 0x35, 0x06, 0x00, 0x00, + // 767 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x53, 0x4f, 0x6b, 0x13, 0x4f, + 0x18, 0xde, 0xc9, 0xce, 0x6e, 0x26, 0x93, 0xa6, 0xbf, 0xed, 0xb6, 0xfc, 0xdc, 0x46, 0xd9, 0x84, + 0x80, 0x10, 0x44, 0x53, 0xa8, 0xa0, 0x20, 0x78, 0x68, 0x8a, 0x10, 0x17, 0xf5, 0xb0, 0x15, 0x0f, + 0x82, 0x84, 0x4d, 0x32, 0x4d, 0x86, 0x66, 0x77, 0xe3, 0xce, 0xac, 0x36, 0x3d, 0xf9, 0x11, 0xfc, + 0x18, 0x82, 0x9f, 0xc0, 0xab, 0xa7, 0x1e, 0x7b, 0x2c, 0x3d, 0x14, 0x93, 0x5e, 0x3c, 0xf6, 0x03, + 0x78, 0x90, 0x9d, 0x9d, 0xfc, 0xc3, 0x94, 0x5a, 0xf0, 0x36, 0xef, 0xf3, 0x3c, 0x33, 0xef, 0xfb, + 0x3e, 0xf3, 0xbe, 0x38, 0x17, 0x0d, 0xda, 0xb5, 0x41, 0x14, 0xf2, 0xd0, 0xd4, 0x79, 0xcf, 0x0b, + 0x42, 0x56, 0xcc, 0xf3, 0xe1, 0x80, 0xb0, 0x14, 0x2c, 0x3e, 0xe8, 0x52, 0xde, 0x8b, 0x5b, 0xb5, + 0x76, 0xe8, 0x6f, 0x75, 0xc3, 0x6e, 0xb8, 0x25, 0xe0, 0x56, 0xbc, 0x2f, 0x22, 0x11, 0x88, 0x93, + 0x94, 0x6f, 0x76, 0xc3, 0xb0, 0xdb, 0x27, 0x33, 0x95, 0x17, 0x0c, 0x53, 0xaa, 0xf2, 0x2d, 0x83, + 0x0b, 0x7b, 0x24, 0xa2, 0x84, 0xb9, 0xe4, 0x7d, 0x4c, 0x18, 0x37, 0x37, 0x31, 0xf2, 0x69, 0xd0, + 0xe4, 0xd4, 0x27, 0x16, 0x28, 0x83, 0xaa, 0xea, 0x66, 0x7d, 0x1a, 0xbc, 0xa6, 0x3e, 0x11, 0x94, + 0x77, 0x98, 0x52, 0x19, 0x49, 0x79, 0x87, 0x82, 0x7a, 0x94, 0x50, 0xbc, 0xdd, 0x23, 0x11, 0xb3, + 0xd4, 0xb2, 0x5a, 0xcd, 0x6f, 0x6f, 0xd4, 0xd2, 0xca, 0x6b, 0x2f, 0xbc, 0x16, 0xe9, 0xbf, 0x4c, + 0xc9, 0x3a, 0x3c, 0x3e, 0x2f, 0x29, 0xee, 0x54, 0x6b, 0x96, 0x70, 0x9e, 0x1d, 0xd0, 0x41, 0xb3, + 0xdd, 0x8b, 0x83, 0x03, 0x66, 0xa1, 0x32, 0xa8, 0x22, 0x17, 0x27, 0xd0, 0xae, 0x40, 0xcc, 0x7b, + 0x58, 0xeb, 0xd1, 0x80, 0x33, 0x2b, 0x57, 0x06, 0xe2, 0xd5, 0xb4, 0x97, 0xda, 0xa4, 0x97, 0xda, + 0x4e, 0x30, 0x74, 0x53, 0x89, 0xf9, 0x14, 0xdf, 0x66, 0x3c, 0x22, 0x9e, 0x4f, 0x83, 0xae, 0x7c, + 0xb1, 0xd9, 0x4a, 0x32, 0x35, 0x19, 0x3d, 0x22, 0x56, 0xa7, 0x0c, 0xaa, 0xd0, 0xb5, 0xa6, 0x92, + 0x34, 0x43, 0x3d, 0x11, 0xec, 0xd1, 0x23, 0xe2, 0x40, 0x04, 0x0d, 0xcd, 0x81, 0x48, 0x33, 0x74, + 0x07, 0x22, 0xdd, 0xc8, 0x3a, 0x10, 0x65, 0x0d, 0xe4, 0x40, 0x84, 0x8d, 0xbc, 0x03, 0x51, 0xde, + 0x58, 0x71, 0x20, 0x5a, 0x31, 0x0a, 0x0e, 0x44, 0x05, 0x63, 0xb5, 0xf2, 0x18, 0x6b, 0x7b, 0xdc, + 0xe3, 0xcc, 0xac, 0xe1, 0xf5, 0x7d, 0x92, 0x34, 0xd4, 0x69, 0xd2, 0xa0, 0x43, 0x0e, 0x9b, 0xad, + 0x21, 0x27, 0x4c, 0xb8, 0x07, 0xdd, 0x35, 0x49, 0x3d, 0x4f, 0x98, 0x7a, 0x42, 0x54, 0x7e, 0xa9, + 0x78, 0x75, 0x62, 0x3a, 0x1b, 0x84, 0x01, 0x23, 0xe6, 0x13, 0xac, 0x33, 0x81, 0x88, 0x5b, 0xf9, + 0xed, 0xd5, 0x89, 0x7b, 0xa9, 0xae, 0x6e, 0x9c, 0x9d, 0x97, 0x56, 0x76, 0x63, 0xc6, 0x43, 0x3f, + 0x45, 0x1a, 0x8a, 0x2b, 0x6f, 0x98, 0x45, 0x9c, 0xfd, 0xe8, 0x45, 0x01, 0x0d, 0xba, 0xe2, 0x57, + 0x72, 0x0d, 0xc5, 0x9d, 0x00, 0xe6, 0xfd, 0x89, 0x7d, 0xea, 0xd5, 0xf6, 0x35, 0x94, 0x89, 0x81, + 0x77, 0xb1, 0xc6, 0x92, 0x8e, 0x2c, 0x28, 0xd4, 0x85, 0x69, 0x11, 0x09, 0x98, 0xc8, 0x04, 0x6b, + 0x52, 0x6c, 0xcc, 0x7c, 0x96, 0x65, 0x6b, 0xe2, 0xc6, 0x9d, 0xd9, 0x0d, 0xc9, 0xcb, 0xfa, 0x13, + 0x93, 0xeb, 0xf6, 0xd9, 0x79, 0xa9, 0x28, 0x9b, 0x58, 0xc2, 0x37, 0x14, 0xf7, 0x3f, 0xb6, 0x88, + 0x2f, 0xa6, 0x92, 0x43, 0xa2, 0x5f, 0x91, 0x6a, 0xee, 0x3f, 0x97, 0xa6, 0x9a, 0xe3, 0x17, 0x52, + 0xc9, 0x49, 0x7b, 0x87, 0x37, 0xff, 0x98, 0x1e, 0xc2, 0x38, 0xf5, 0x3d, 0x4e, 0xac, 0xac, 0xc8, + 0x59, 0xba, 0x22, 0xe7, 0x33, 0x29, 0x6b, 0x28, 0xee, 0x2d, 0xb6, 0x9c, 0xaa, 0x23, 0xac, 0x47, + 0x84, 0xc5, 0x7d, 0x5e, 0xf9, 0x0a, 0xf0, 0x9a, 0x58, 0x8a, 0x57, 0x9e, 0x3f, 0xdb, 0xbb, 0x0d, + 0xe1, 0x7d, 0xc4, 0xc5, 0x4f, 0xa9, 0x6e, 0x1a, 0x98, 0x06, 0x56, 0x49, 0xd0, 0x11, 0xff, 0xa1, + 0xba, 0xc9, 0x71, 0xb6, 0x10, 0xda, 0xf5, 0x0b, 0x31, 0xbf, 0x95, 0xfa, 0xdf, 0x6f, 0xa5, 0x03, + 0x11, 0x30, 0x32, 0x0e, 0x44, 0x19, 0x43, 0xad, 0x44, 0xd8, 0x9c, 0x2f, 0x56, 0xce, 0xeb, 0x06, + 0xd6, 0x82, 0x04, 0xb0, 0x40, 0x59, 0xad, 0xe6, 0xdc, 0x34, 0x30, 0x8b, 0x18, 0xc9, 0xc1, 0x63, + 0x56, 0x46, 0x10, 0xd3, 0x78, 0x56, 0xb7, 0x7a, 0x6d, 0xdd, 0x95, 0xef, 0x40, 0x26, 0x7d, 0xe3, + 0xf5, 0xe3, 0x05, 0x8b, 0xfa, 0x09, 0x2a, 0x76, 0x24, 0xe7, 0xa6, 0xc1, 0xcc, 0x38, 0xb8, 0xc4, + 0x38, 0x6d, 0x89, 0x71, 0xfa, 0xcd, 0x8c, 0xcb, 0xde, 0xc8, 0xb8, 0x8c, 0xa1, 0x3a, 0x10, 0xa9, + 0x06, 0xac, 0xc4, 0x78, 0x7d, 0xa1, 0x07, 0xe9, 0xdc, 0xff, 0x58, 0xff, 0x20, 0x10, 0x69, 0x9d, + 0x8c, 0xfe, 0x95, 0x77, 0xf5, 0x9d, 0xe3, 0x91, 0xad, 0x9c, 0x8c, 0x6c, 0xe5, 0x74, 0x64, 0x2b, + 0x97, 0x23, 0x1b, 0x7c, 0x1a, 0xdb, 0xe0, 0xcb, 0xd8, 0x06, 0xc7, 0x63, 0x1b, 0x9c, 0x8c, 0x6d, + 0xf0, 0x63, 0x6c, 0x83, 0x9f, 0x63, 0x5b, 0xb9, 0x1c, 0xdb, 0xe0, 0xf3, 0x85, 0xad, 0x9c, 0x5c, + 0xd8, 0xca, 0xe9, 0x85, 0xad, 0xbc, 0xcd, 0x32, 0x1e, 0x46, 0x64, 0xd0, 0x6a, 0xe9, 0xe2, 0xdd, + 0x87, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xcd, 0x3a, 0x38, 0x35, 0x87, 0x06, 0x00, 0x00, } func (this *SeriesRequest) Equal(that interface{}) bool { @@ -620,7 +596,11 @@ func (this *SeriesResponse_Series) Equal(that interface{}) bool { } else if this == nil { return false } - if !this.Series.Equal(that1.Series) { + if that1.Series == nil { + if this.Series != nil { + return false + } + } else if !this.Series.Equal(*that1.Series) { return false } return true @@ -716,7 +696,11 @@ func (this *SeriesResponse_StreamingSeries) Equal(that interface{}) bool { } else if this == nil { return false } - if !this.StreamingSeries.Equal(that1.StreamingSeries) { + if that1.StreamingSeries == nil { + if this.StreamingSeries != nil { + return false + } + } else if !this.StreamingSeries.Equal(*that1.StreamingSeries) { return false } return true @@ -740,7 +724,11 @@ func (this *SeriesResponse_StreamingChunks) Equal(that interface{}) bool { } else if this == nil { return false } - if !this.StreamingChunks.Equal(that1.StreamingChunks) { + if that1.StreamingChunks == nil { + if this.StreamingChunks != nil { + return false + } + } else if !this.StreamingChunks.Equal(*that1.StreamingChunks) { return false } return true @@ -1252,11 +1240,11 @@ func (m *SeriesResponse_Series) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) if m.Series != nil { { - size, err := m.Series.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { + size := m.Series.Size() + i -= size + if _, err := m.Series.MarshalTo(dAtA[i:]); err != nil { return 0, err } - i -= size i = encodeVarintRpc(dAtA, i, uint64(size)) } i-- @@ -1325,11 +1313,11 @@ func (m *SeriesResponse_StreamingSeries) MarshalToSizedBuffer(dAtA []byte) (int, i := len(dAtA) if m.StreamingSeries != nil { { - size, err := m.StreamingSeries.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { + size := m.StreamingSeries.Size() + i -= size + if _, err := m.StreamingSeries.MarshalTo(dAtA[i:]); err != nil { return 0, err } - i -= size i = encodeVarintRpc(dAtA, i, uint64(size)) } i-- @@ -1345,11 +1333,11 @@ func (m *SeriesResponse_StreamingChunks) MarshalToSizedBuffer(dAtA []byte) (int, i := len(dAtA) if m.StreamingChunks != nil { { - size, err := m.StreamingChunks.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { + size := m.StreamingChunks.Size() + i -= size + if _, err := m.StreamingChunks.MarshalTo(dAtA[i:]); err != nil { return 0, err } - i -= size i = encodeVarintRpc(dAtA, i, uint64(size)) } i-- @@ -2375,7 +2363,7 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - v := &Series{} + v := &CustomSeries{} if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -2512,7 +2500,7 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - v := &StreamingSeriesBatch{} + v := &CustomStreamingSeriesBatch{} if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -2547,7 +2535,7 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - v := &StreamingChunksBatch{} + v := &CustomStreamingChunksBatch{} if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } diff --git a/pkg/storegateway/storepb/rpc.pb.go.expdiff b/pkg/storegateway/storepb/rpc.pb.go.expdiff index 85d6e4a13d6..e36f86b529e 100644 --- a/pkg/storegateway/storepb/rpc.pb.go.expdiff +++ b/pkg/storegateway/storepb/rpc.pb.go.expdiff @@ -1,22 +1,31 @@ diff --git a/pkg/storegateway/storepb/rpc.pb.go b/pkg/storegateway/storepb/rpc.pb.go -index 23cdf0292..763f925f0 100644 +index 79b2d8863c..afc2785d4a 100644 --- a/pkg/storegateway/storepb/rpc.pb.go +++ b/pkg/storegateway/storepb/rpc.pb.go -@@ -8,7 +8,6 @@ import ( - _ "github.com/gogo/protobuf/gogoproto" - proto "github.com/gogo/protobuf/proto" - types "github.com/gogo/protobuf/types" -- "github.com/grafana/mimir/pkg/mimirpb" - io "io" - math "math" - math_bits "math/bits" -@@ -121,9 +120,6 @@ func (m *Stats) XXX_DiscardUnknown() { - var xxx_messageInfo_Stats proto.InternalMessageInfo - - type SeriesResponse struct { -- // Keep reference to buffer for unsafe references. -- mimirpb.BufferHolder -- - // Types that are valid to be assigned to Result: - // *SeriesResponse_Series - // *SeriesResponse_Warning +@@ -2364,7 +2364,7 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { + if postIndex > l { + return io.ErrUnexpectedEOF + } +- v := &CustomSeries{} ++ v := &Series{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } +@@ -2501,7 +2501,7 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { + if postIndex > l { + return io.ErrUnexpectedEOF + } +- v := &CustomStreamingSeriesBatch{} ++ v := &StreamingSeriesBatch{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } +@@ -2536,7 +2536,7 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { + if postIndex > l { + return io.ErrUnexpectedEOF + } +- v := &CustomStreamingChunksBatch{} ++ v := &StreamingChunksBatch{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } diff --git a/pkg/storegateway/storepb/rpc.proto b/pkg/storegateway/storepb/rpc.proto index e8a4404fc7b..195a4116de9 100644 --- a/pkg/storegateway/storepb/rpc.proto +++ b/pkg/storegateway/storepb/rpc.proto @@ -82,7 +82,7 @@ message Stats { message SeriesResponse { oneof result { /// series contains 1 response series. The series labels are sorted by name. - Series series = 1; + Series series = 1 [(gogoproto.customtype) = "CustomSeries"]; /// warning is considered an information piece in place of series for warning purposes. /// It is used to warn store API user about suspicious cases or partial response (if enabled). @@ -102,14 +102,14 @@ message SeriesResponse { /// streaming_series is a list of series labels sent as part of a streaming Series call. /// These are populated only when streaming_chunks_batch_size > 0 in the series request. /// Series are sent in batches because sending one at a time has additional CPU overhead for not much memory gains. - StreamingSeriesBatch streaming_series = 5; + StreamingSeriesBatch streaming_series = 5 [(gogoproto.customtype) = "CustomStreamingSeriesBatch"]; /// streaming_chunks is a list of chunks sent as part of a streaming Series request. /// They are associated with series labels sent as streaming_series earlier in the same Series request. /// These are populated only when streaming_chunks_batch_size > 0 in the series request. /// Chunks are sent in batches because sending one series' chunks at a time has additional /// CPU overhead for not much memory gains. - StreamingChunksBatch streaming_chunks = 6; + StreamingChunksBatch streaming_chunks = 6 [(gogoproto.customtype) = "CustomStreamingChunksBatch"]; /// streaming_chunks_estimate contains an estimate of the number of chunks expected to be sent as part a streaming /// Series call. diff --git a/pkg/storegateway/storepb/types.proto b/pkg/storegateway/storepb/types.proto index 0baeaf78d8a..2d7c94313c2 100644 --- a/pkg/storegateway/storepb/types.proto +++ b/pkg/storegateway/storepb/types.proto @@ -59,7 +59,6 @@ message StreamingChunksEstimate { uint64 estimated_chunk_count = 1; } - message AggrChunk { int64 min_time = 1; int64 max_time = 2;