Skip to content

Commit

Permalink
Protected the fetchers
Browse files Browse the repository at this point in the history
Signed-off-by: gotjosh <josue.abreu@gmail.com>
  • Loading branch information
gotjosh committed Nov 8, 2024
1 parent 562a587 commit b845be6
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math"
"strconv"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -79,8 +80,10 @@ type PartitionReader struct {
consumerGroup string
concurrentFetchersMinBytesMaxWaitTime time.Duration

client *kgo.Client
fetcher fetcher
client *kgo.Client

fetcherMtx sync.Mutex
fetcher fetcher

newConsumer consumerFactory
metrics readerMetrics
Expand Down Expand Up @@ -133,6 +136,8 @@ func (r *PartitionReader) Update(_ context.Context, _, _ int) {
}

func (r *PartitionReader) BufferedRecords() float64 {
r.fetcherMtx.Lock()
defer r.fetcherMtx.Unlock()
var fcount, ccount float64
if r.fetcher != nil && r.fetcher != r {
fcount = r.fetcher.BufferedRecords()
Expand Down Expand Up @@ -219,15 +224,19 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) {
if err != nil {
return errors.Wrap(err, "creating concurrent fetchers during startup")
}
r.fetcherMtx.Lock()
r.fetcher = f
r.fetcherMtx.Unlock()
} else {
// When concurrent fetch is disabled we read records directly from the Kafka client, so we want it
// to consume the partition.
r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)},
})

r.fetcherMtx.Lock()
r.fetcher = r
r.fetcherMtx.Unlock()
}

// Enforce the max consumer lag (if enabled).
Expand Down Expand Up @@ -285,6 +294,9 @@ func (r *PartitionReader) run(ctx context.Context) error {
// switchToOngoingFetcher switches to the configured ongoing fetcher. This function could be
// called multiple times.
func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) {
r.fetcherMtx.Lock()
defer r.fetcherMtx.Unlock()

if r.kafkaCfg.StartupFetchConcurrency == r.kafkaCfg.OngoingFetchConcurrency && r.kafkaCfg.StartupRecordsPerFetch == r.kafkaCfg.OngoingRecordsPerFetch {
// we're already using the same settings, no need to switch
return
Expand Down

0 comments on commit b845be6

Please sign in to comment.