Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix MimirIngesterStuckProcessingRecordsFromKafka #9855

Merged
merged 19 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
### Mixin

* [CHANGE] Remove backwards compatibility for `thanos_memcached_` prefixed metrics in dashboards and alerts removed in 2.12. #9674 #9758
* [CHANGE] Reworked the alert `MimirIngesterStuckProcessingRecordsFromKafka` to also work when concurrent fetching is enabled. #9855
* [ENHANCEMENT] Unify ingester autoscaling panels on 'Mimir / Writes' dashboard to work for both ingest-storage and non-ingest-storage autoscaling. #9617
* [ENHANCEMENT] Alerts: Enable configuring job prefix for alerts to prevent clashes with metrics from Loki/Tempo. #9659
* [ENHANCEMENT] Dashboards: visualize the age of source blocks in the "Mimir / Compactor" dashboard. #9697
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,7 @@ spec:
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
for: 5m
labels:
severity: critical
Expand Down
3 changes: 1 addition & 2 deletions operations/mimir-mixin-compiled-baremetal/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1103,8 +1103,7 @@ groups:
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
for: 5m
labels:
severity: critical
Expand Down
3 changes: 1 addition & 2 deletions operations/mimir-mixin-compiled/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1117,8 +1117,7 @@ groups:
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
for: 5m
labels:
severity: critical
Expand Down
3 changes: 1 addition & 2 deletions operations/mimir-mixin/alerts/ingest-storage.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
||| % $._config,
labels: {
severity: 'critical',
Expand Down
100 changes: 76 additions & 24 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type fetcher interface {

// Stop stops the fetcher.
Stop()

// BufferedRecords returns the number of records that have been fetched but not yet consumed.
BufferedRecords() int64
}

// fetchWant represents a range of offsets to fetch.
Expand Down Expand Up @@ -220,20 +223,19 @@ type concurrentFetchers struct {

minBytesWaitTime time.Duration

orderedFetches chan fetchResult
// orderedFetches is a channel where we write fetches that are ready to be polled by PollFetches().
// Since all records must be polled in order, the fetches written to this channel are after
// ordering and "deduplication" (in case the same record is fetched multiple times from different
// routines).
orderedFetches chan fetchResult

lastReturnedRecord int64
startOffsets *genericOffsetReader[int64]

// trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes.
trackCompressedBytes bool
}

// Stop implements fetcher
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a move as is - these methods above the constructor seem to be oddly placed.

func (r *concurrentFetchers) Stop() {
close(r.done)

r.wg.Wait()
level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
bufferedFetchedRecords *atomic.Int64
}

// newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset.
Expand Down Expand Up @@ -267,18 +269,19 @@ func newConcurrentFetchers(
return nil, fmt.Errorf("resolving offset to start consuming from: %w", err)
}
f := &concurrentFetchers{
client: client,
logger: logger,
topicName: topic,
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
tracer: recordsTracer(),
orderedFetches: make(chan fetchResult),
done: make(chan struct{}),
bufferedFetchedRecords: atomic.NewInt64(0),
client: client,
logger: logger,
topicName: topic,
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
tracer: recordsTracer(),
orderedFetches: make(chan fetchResult),
done: make(chan struct{}),
}

topics, err := kadm.NewClient(client).ListTopics(ctx, topic)
Expand All @@ -299,6 +302,28 @@ func newConcurrentFetchers(
return f, nil
}

// BufferedRecords implements fetcher.
func (r *concurrentFetchers) BufferedRecords() int64 {
return r.bufferedFetchedRecords.Load()
}

// Stop implements fetcher.
func (r *concurrentFetchers) Stop() {
// Ensure it's not already stopped.
select {
case _, ok := <-r.done:
if !ok {
return
}
default:
}

close(r.done)
r.wg.Wait()

level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
}

// Update implements fetcher
func (r *concurrentFetchers) Update(ctx context.Context, concurrency, records int) {
r.Stop()
Expand Down Expand Up @@ -586,18 +611,31 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
go r.run(ctx, wants, logger, highWatermark)
}

// We need to make sure we don't leak any goroutine given that start is called within a goroutine.
defer r.wg.Done()
Comment on lines +628 to +629
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: moved here just to group it with r.wg.Add().


var (
nextFetch = fetchWantFrom(startOffset, recordsPerFetch)
nextResult chan fetchResult
pendingResults = list.New()

bufferedResult fetchResult
readyBufferedResults chan fetchResult // this is non-nil when bufferedResult is non-empty
// bufferedResult is the next fetch that should be polled by PollFetches().
bufferedResult fetchResult

// readyBufferedResults channel gets continuously flipped between nil and the actual channel
// where PollFetches() reads from. This channel is nil when there are no ordered buffered
// records ready to be written to the channel where PollFetches(), and is non-nil when there
// are some ordered buffered records ready.
//
// It is guaranteed that this channel is non-nil when bufferedResult is non-empty.
readyBufferedResults chan fetchResult
)
nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume

// We need to make sure we don't leak any goroutine given that start is called within a goroutine.
defer r.wg.Done()
// When the fetcher is stopped, buffered records are intentionally dropped. For this reason,
// we do reset the counter of buffered records to zero when this goroutine ends.
defer r.bufferedFetchedRecords.Store(0)

for {
refillBufferedResult := nextResult
if readyBufferedResults != nil {
Expand Down Expand Up @@ -641,10 +679,24 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
continue
}
nextFetch = nextFetch.UpdateBytesPerRecord(result.fetchedBytes, len(result.Records))

// We have some ordered records ready to be sent to PollFetches(). We store the fetch
// result in bufferedResult, and we flip readyBufferedResults to the channel used by
// PollFetches().
bufferedResult = result
readyBufferedResults = r.orderedFetches

// We increase the count of buffered records only for ordered records that we're sure
// will not be discarded later, so that we can get an accurate tracking of the records
// ready to be polled by PollFetches() but not polled yet.
r.bufferedFetchedRecords.Add(int64(len(result.Records)))
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved

case readyBufferedResults <- bufferedResult:
// We've written the records to the channel read by PollFetches(). Since the channel
// is not buffered, as soon as the write succeed it means PollFetches() has read it,
// so we can consider these records polled and decrease the buffered records counter.
r.bufferedFetchedRecords.Sub(int64(len(bufferedResult.Records)))

bufferedResult.finishWaitingForConsumption()
readyBufferedResults = nil
bufferedResult = fetchResult{}
Expand Down
Loading
Loading