Skip to content

Commit

Permalink
SeriesResponse: Use memory pooling
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Jan 5, 2025
1 parent 87b249a commit 4af26b5
Show file tree
Hide file tree
Showing 18 changed files with 1,544 additions and 327 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/grafana/mimir

go 1.22.7
go 1.23

// Please note that this directive is ignored when building with the Mimir build image,
// that will always use its bundled toolchain.
Expand Down
10 changes: 8 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1528,8 +1528,14 @@ func NextOrCleanup(next PushFunc, pushReq *Request) (_ PushFunc, maybeCleanup fu
return next(ctx, req)
},
func() {
if cleanupInDefer {
pushReq.CleanUp()
if !cleanupInDefer {
return
}

req, _ := pushReq.WriteRequest()
pushReq.CleanUp()
if req != nil {
req.FreeBuffer()
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMa

// Implementation of storage.SeriesSet, based on individual responses from store client.
type blockQuerierSeriesSet struct {
series []*storepb.Series
series []*storepb.CustomSeries

// next response to process
next int
Expand All @@ -60,6 +60,10 @@ func (bqss *blockQuerierSeriesSet) Next() bool {
bqss.currSeries = nil

if bqss.next >= len(bqss.series) {
for _, s := range bqss.series {
s.Release()
}
bqss.series = nil
return false
}

Expand Down Expand Up @@ -134,7 +138,9 @@ func newBlockQuerierSeriesIterator(reuse chunkenc.Iterator, lbls labels.Labels,
return chunk.ErrorIterator(fmt.Sprintf("cannot create new chunk for series %s: %s", lbls.String(), err.Error()))
}

if err := ch.UnmarshalFromBuf(c.Raw.Data); err != nil {
err = ch.UnmarshalFromBuf(c.Raw.Data)
storepb.ReleaseChunk(&c.Raw)
if err != nil {
return chunk.ErrorIterator(fmt.Sprintf("cannot unmarshal chunk for series %s: %s", lbls.String(), err.Error()))
}

Expand Down
52 changes: 23 additions & 29 deletions pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"fmt"
"io"
"slices"
"sort"

"github.com/go-kit/log"
Expand Down Expand Up @@ -120,6 +119,9 @@ func (bqs *blockStreamingQuerierSeries) Iterator(reuse chunkenc.Iterator) chunke
for i := bqs.seriesIdxStart; i <= bqs.seriesIdxEnd; i++ {
chks, err := bqs.streamReader.GetChunks(uint64(i))
if err != nil {
for _, chk := range allChunks {
storepb.ReleaseChunk(&chk.Raw)
}
return series.NewErrIterator(err)
}
allChunks = append(allChunks, chks...)
Expand Down Expand Up @@ -155,8 +157,8 @@ type storeGatewayStreamReader struct {
log log.Logger

chunkCountEstimateChan chan int
seriesChunksChan chan *storepb.StreamingChunksBatch
chunksBatch []*storepb.StreamingChunks
seriesChunksChan chan *storepb.CustomStreamingChunksBatch
chunksBatch *storepb.CustomStreamingChunksBatch
errorChan chan error
err error
}
Expand Down Expand Up @@ -189,7 +191,7 @@ func (s *storeGatewayStreamReader) Close() {
func (s *storeGatewayStreamReader) StartBuffering() {
// Important: to ensure that the goroutine does not become blocked and leak, the goroutine must only ever write to errorChan at most once.
s.errorChan = make(chan error, 1)
s.seriesChunksChan = make(chan *storepb.StreamingChunksBatch, 1)
s.seriesChunksChan = make(chan *storepb.CustomStreamingChunksBatch, 1)
s.chunkCountEstimateChan = make(chan int, 1)

go func() {
Expand Down Expand Up @@ -244,7 +246,6 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
}

estimate := msg.GetStreamingChunksEstimate()
msg.FreeBuffer()
if estimate == nil {
return fmt.Errorf("expected to receive chunks estimate, but got message of type %T", msg.Result)
}
Expand All @@ -262,18 +263,16 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error

batch := msg.GetStreamingChunks()
if batch == nil {
msg.FreeBuffer()
return fmt.Errorf("expected to receive streaming chunks, but got message of type %T", msg.Result)
}

if len(batch.Series) == 0 {
msg.FreeBuffer()
batch.Release()
continue
}

totalSeries += len(batch.Series)
if totalSeries > s.expectedSeriesCount {
msg.FreeBuffer()
return fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries)
}

Expand All @@ -287,37 +286,23 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
}
totalChunks += numChunks
if err := s.queryLimiter.AddChunks(numChunks); err != nil {
msg.FreeBuffer()
return err
}
if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil {
msg.FreeBuffer()
return err
}

s.stats.AddFetchedChunks(uint64(numChunks))
s.stats.AddFetchedChunkBytes(uint64(chunkBytes))

// Memory safe copy.
safeSeries := make([]*storepb.StreamingChunks, 0, len(batch.Series))
for _, s := range batch.Series {
safe := *s
safe.Chunks = slices.Clone(s.Chunks)
for i, c := range safe.Chunks {
safe.Chunks[i].Raw.Data = slices.Clone(c.Raw.Data)
}
safeSeries = append(safeSeries, &safe)
}
batch.Series = safeSeries
msg.FreeBuffer()

if err := s.sendBatch(batch); err != nil {
batch.Release()
return err
}
}
}

func (s *storeGatewayStreamReader) sendBatch(c *storepb.StreamingChunksBatch) error {
func (s *storeGatewayStreamReader) sendBatch(c *storepb.CustomStreamingChunksBatch) error {
if err := s.ctx.Err(); err != nil {
// If the context is already cancelled, stop now for the same reasons as below.
// We do this extra check here to ensure that we don't get unlucky and continue to send to seriesChunksChan even if
Expand Down Expand Up @@ -371,20 +356,26 @@ func (s *storeGatewayStreamReader) GetChunks(seriesIndex uint64) (_ []storepb.Ag
s.err = err
}()

if len(s.chunksBatch) == 0 {
if s.chunksBatch == nil {
if err := s.readNextBatch(seriesIndex); err != nil {
return nil, err
}
}

chks := s.chunksBatch[0]
if len(s.chunksBatch) > 1 {
s.chunksBatch = s.chunksBatch[1:]
chks := s.chunksBatch.Series[0]
if len(s.chunksBatch.Series) > 1 {
s.chunksBatch.Series = s.chunksBatch.Series[1:]
} else {
// Take ownership of chks before releasing the batch.
s.chunksBatch.Series = s.chunksBatch.Series[:0]
s.chunksBatch.Release()
s.chunksBatch = nil
}

if chks.SeriesIndex != seriesIndex {
for _, chk := range chks.Chunks {
storepb.ReleaseChunk(&chk.Raw)
}
return nil, fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has series with index %v", seriesIndex, chks.SeriesIndex)
}

Expand All @@ -400,6 +391,9 @@ func (s *storeGatewayStreamReader) GetChunks(seriesIndex uint64) (_ []storepb.Ag
// is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being
// logged and included in metrics and traces, when in fact the call succeeded.
if err := <-s.errorChan; err != nil {
for _, chk := range chks.Chunks {
storepb.ReleaseChunk(&chk.Raw)
}
return nil, fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has failed: %w", seriesIndex, err)
}
}
Expand All @@ -426,7 +420,7 @@ func (s *storeGatewayStreamReader) readNextBatch(seriesIndex uint64) error {
return fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount)
}

s.chunksBatch = chks.Series
s.chunksBatch = chks
return nil
}

Expand Down
Loading

0 comments on commit 4af26b5

Please sign in to comment.