Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix MimirIngesterStuckProcessingRecordsFromKafka #9855

Merged
merged 19 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
### Mixin

* [CHANGE] Remove backwards compatibility for `thanos_memcached_` prefixed metrics in dashboards and alerts removed in 2.12. #9674 #9758
* [CHANGE] Reworked the alert `MimirIngesterStuckProcessingRecordsFromKafka` to also work when concurrent fetching is enabled. #9855
* [ENHANCEMENT] Unify ingester autoscaling panels on 'Mimir / Writes' dashboard to work for both ingest-storage and non-ingest-storage autoscaling. #9617
* [ENHANCEMENT] Alerts: Enable configuring job prefix for alerts to prevent clashes with metrics from Loki/Tempo. #9659
* [ENHANCEMENT] Dashboards: visualize the age of source blocks in the "Mimir / Compactor" dashboard. #9697
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,7 @@ spec:
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
for: 5m
labels:
severity: critical
Expand Down
3 changes: 1 addition & 2 deletions operations/mimir-mixin-compiled-baremetal/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1103,8 +1103,7 @@ groups:
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
for: 5m
labels:
severity: critical
Expand Down
3 changes: 1 addition & 2 deletions operations/mimir-mixin-compiled/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1117,8 +1117,7 @@ groups:
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
for: 5m
labels:
severity: critical
Expand Down
3 changes: 1 addition & 2 deletions operations/mimir-mixin/alerts/ingest-storage.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
||| % $._config,
labels: {
severity: 'critical',
Expand Down
116 changes: 90 additions & 26 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type fetcher interface {

// Stop stops the fetcher.
Stop()

// BufferedRecords returns the number of records that have been fetched but not yet consumed.
BufferedRecords() int64
}

// fetchWant represents a range of offsets to fetch.
Expand Down Expand Up @@ -220,20 +223,18 @@ type concurrentFetchers struct {

minBytesWaitTime time.Duration

orderedFetches chan fetchResult
// orderedFetches is a channel where we write fetches that are ready to be polled by PollFetches().
// Since all records must be polled in order, the fetches written to this channel are after
// ordering.
orderedFetches chan fetchResult

lastReturnedRecord int64
startOffsets *genericOffsetReader[int64]

// trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes.
trackCompressedBytes bool
}

// Stop implements fetcher
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a move as is - these methods above the constructor seem to be oddly placed.

func (r *concurrentFetchers) Stop() {
close(r.done)

r.wg.Wait()
level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
bufferedFetchedRecords *atomic.Int64
}

// newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset.
Expand Down Expand Up @@ -267,18 +268,19 @@ func newConcurrentFetchers(
return nil, fmt.Errorf("resolving offset to start consuming from: %w", err)
}
f := &concurrentFetchers{
client: client,
logger: logger,
topicName: topic,
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
tracer: recordsTracer(),
orderedFetches: make(chan fetchResult),
done: make(chan struct{}),
bufferedFetchedRecords: atomic.NewInt64(0),
client: client,
logger: logger,
topicName: topic,
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
tracer: recordsTracer(),
orderedFetches: make(chan fetchResult),
done: make(chan struct{}),
}

topics, err := kadm.NewClient(client).ListTopics(ctx, topic)
Expand All @@ -299,6 +301,32 @@ func newConcurrentFetchers(
return f, nil
}

// BufferedRecords implements fetcher.
func (r *concurrentFetchers) BufferedRecords() int64 {
return r.bufferedFetchedRecords.Load()
}

// Stop implements fetcher.
func (r *concurrentFetchers) Stop() {
// Ensure it's not already stopped.
select {
case _, ok := <-r.done:
if !ok {
return
}
default:
}

close(r.done)
r.wg.Wait()

// When the fetcher is stopped, buffered records are intentionally dropped. For this reason,
// we do reset the counter of buffered records here.
r.bufferedFetchedRecords.Store(0)

level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
}

// Update implements fetcher
func (r *concurrentFetchers) Update(ctx context.Context, concurrency, records int) {
r.Stop()
Expand All @@ -315,6 +343,13 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont
case <-ctx.Done():
return kgo.Fetches{}, ctx
case f := <-r.orderedFetches:
// The records have been polled from the buffer, so we can now decrease the number of
// buffered records. It's important to note that we decrease it by the number of actually
// buffered records and not by the number of records returned by PollFetchers(), which
// could be lower if some records are discarded because "old" (already returned by previous
// PollFetches() calls).
r.bufferedFetchedRecords.Sub(int64(len(f.Records)))

firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord)
r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime)

Expand Down Expand Up @@ -499,6 +534,10 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg
attemptSpan.SetTag("attempt", attempt)

f := r.fetchSingle(ctx, w)

// We increase the count of buffered records as soon as we fetch them.
r.bufferedFetchedRecords.Add(int64(len(f.Records)))

f = f.Merge(previousResult)
previousResult = f
if f.Err != nil {
Expand Down Expand Up @@ -586,18 +625,39 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
go r.run(ctx, wants, logger, highWatermark)
}

// We need to make sure we don't leak any goroutine given that start is called within a goroutine.
defer r.wg.Done()
Comment on lines +628 to +629
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: moved here just to group it with r.wg.Add().


var (
nextFetch = fetchWantFrom(startOffset, recordsPerFetch)
nextResult chan fetchResult
// nextFetch is the next records fetch operation we want to issue to one of the running workers.
// It contains the offset range to fetch and a channel where the result should be written to.
nextFetch = fetchWantFrom(startOffset, recordsPerFetch)

// nextResult is the channel where we expect a worker will write the result of the next fetch
// operation. This result is the next result that will be returned to PollFetches(), guaranteeing
// records ordering.
nextResult chan fetchResult

// pendingResults is the list of all fetchResult of all inflight fetch operations. Pending results
// are ordered in the same order these results should be returned to PollFetches(), so the first one
// in the list is the next one that should be returned, unless nextResult is valued (if nextResult
// is valued, then nextResult is the next and the first item in the pendingResults list is the
// 2nd next one).
pendingResults = list.New()

bufferedResult fetchResult
readyBufferedResults chan fetchResult // this is non-nil when bufferedResult is non-empty
// bufferedResult is the next fetch that should be polled by PollFetches().
bufferedResult fetchResult

// readyBufferedResults channel gets continuously flipped between nil and the actual channel
// where PollFetches() reads from. This channel is nil when there are no ordered buffered
// records ready to be written to the channel where PollFetches(), and is non-nil when there
// are some ordered buffered records ready.
//
// It is guaranteed that this channel is non-nil when bufferedResult is non-empty.
readyBufferedResults chan fetchResult
)
nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume

// We need to make sure we don't leak any goroutine given that start is called within a goroutine.
defer r.wg.Done()
for {
refillBufferedResult := nextResult
if readyBufferedResults != nil {
Expand Down Expand Up @@ -641,6 +701,10 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
continue
}
nextFetch = nextFetch.UpdateBytesPerRecord(result.fetchedBytes, len(result.Records))

// We have some ordered records ready to be sent to PollFetches(). We store the fetch
// result in bufferedResult, and we flip readyBufferedResults to the channel used by
// PollFetches().
bufferedResult = result
readyBufferedResults = r.orderedFetches

Expand Down
Loading
Loading