Skip to content

Commit 72cbd83

Browse files
gotjoshpracucci
andauthored
fix MimirIngesterStuckProcessingRecordsFromKafka (#9855)
* fix `MimirIngesterStuckProcessingRecordsFromKafka` The alert `MimirIngesterStuckProcessingRecordsFromKafka` relied on the metric `cortex_ingest_storage_reader_buffered_fetch_records_total ` provided by the Kafka client to identify wether we had stuck buffers or not. Now that we've implemented concurrent fetching from Kafka and bypass the client's polling function we needed an equivalent metric when using concurrent fetching. This PR does that; In addition to that - the metric also takes the client's buffered records In case we do use a mixture of non-concurrent fetching and concurrent fetching. * Add changelog Signed-off-by: gotjosh <josue.abreu@gmail.com> * reestrcture metric assignment Signed-off-by: gotjosh <josue.abreu@gmail.com> * Remove the registry Signed-off-by: gotjosh <josue.abreu@gmail.com> * Fix helm Signed-off-by: gotjosh <josue.abreu@gmail.com> * Protected the fetchers Signed-off-by: gotjosh <josue.abreu@gmail.com> * Change log to debug Signed-off-by: gotjosh <josue.abreu@gmail.com> * make `BufferedRecords` int64 and remove debug logs Signed-off-by: gotjosh <josue.abreu@gmail.com> * Move buffered records increment location Signed-off-by: gotjosh <josue.abreu@gmail.com> * Use atomic functions for locking / unlocking the client and fetcher. Signed-off-by: gotjosh <josue.abreu@gmail.com> * assert on buffered records Signed-off-by: gotjosh <josue.abreu@gmail.com> * reset the records buffer when `stop()` is called. Signed-off-by: gotjosh <josue.abreu@gmail.com> * Fix test Signed-off-by: Marco Pracucci <marco@pracucci.com> * Changed how buffered records are tracked, improved unit tests and used atomic instead of a mutex to protect client/fetcher access Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fix Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fix comment Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fix comment Signed-off-by: Marco Pracucci <marco@pracucci.com> * Get back to Josh implementation of buffered records tracking which has better coverage of all buffered records Signed-off-by: Marco Pracucci <marco@pracucci.com> * Use atomic for fetcher too Signed-off-by: Marco Pracucci <marco@pracucci.com> --------- Signed-off-by: gotjosh <josue.abreu@gmail.com> Signed-off-by: Marco Pracucci <marco@pracucci.com> Co-authored-by: Marco Pracucci <marco@pracucci.com>
1 parent 5fd2424 commit 72cbd83

File tree

9 files changed

+477
-156
lines changed

9 files changed

+477
-156
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
### Mixin
7777

7878
* [CHANGE] Remove backwards compatibility for `thanos_memcached_` prefixed metrics in dashboards and alerts removed in 2.12. #9674 #9758
79+
* [CHANGE] Reworked the alert `MimirIngesterStuckProcessingRecordsFromKafka` to also work when concurrent fetching is enabled. #9855
7980
* [ENHANCEMENT] Unify ingester autoscaling panels on 'Mimir / Writes' dashboard to work for both ingest-storage and non-ingest-storage autoscaling. #9617
8081
* [ENHANCEMENT] Alerts: Enable configuring job prefix for alerts to prevent clashes with metrics from Loki/Tempo. #9659
8182
* [ENHANCEMENT] Dashboards: visualize the age of source blocks in the "Mimir / Compactor" dashboard. #9697

operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,8 +1129,7 @@ spec:
11291129
rate(cortex_ingest_storage_reader_requests_total[5m])
11301130
) == 0)
11311131
and
1132-
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
1133-
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
1132+
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
11341133
for: 5m
11351134
labels:
11361135
severity: critical

operations/mimir-mixin-compiled-baremetal/alerts.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,8 +1103,7 @@ groups:
11031103
rate(cortex_ingest_storage_reader_requests_total[5m])
11041104
) == 0)
11051105
and
1106-
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
1107-
(sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
1106+
(sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
11081107
for: 5m
11091108
labels:
11101109
severity: critical

operations/mimir-mixin-compiled/alerts.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,8 +1117,7 @@ groups:
11171117
rate(cortex_ingest_storage_reader_requests_total[5m])
11181118
) == 0)
11191119
and
1120-
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
1121-
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
1120+
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
11221121
for: 5m
11231122
labels:
11241123
severity: critical

operations/mimir-mixin/alerts/ingest-storage.libsonnet

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,7 @@
151151
rate(cortex_ingest_storage_reader_requests_total[5m])
152152
) == 0)
153153
and
154-
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
155-
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
154+
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
156155
||| % $._config,
157156
labels: {
158157
severity: 'critical',

pkg/storage/ingest/fetcher.go

Lines changed: 90 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ type fetcher interface {
4848

4949
// Stop stops the fetcher.
5050
Stop()
51+
52+
// BufferedRecords returns the number of records that have been fetched but not yet consumed.
53+
BufferedRecords() int64
5154
}
5255

5356
// fetchWant represents a range of offsets to fetch.
@@ -220,20 +223,18 @@ type concurrentFetchers struct {
220223

221224
minBytesWaitTime time.Duration
222225

223-
orderedFetches chan fetchResult
226+
// orderedFetches is a channel where we write fetches that are ready to be polled by PollFetches().
227+
// Since all records must be polled in order, the fetches written to this channel are after
228+
// ordering.
229+
orderedFetches chan fetchResult
230+
224231
lastReturnedRecord int64
225232
startOffsets *genericOffsetReader[int64]
226233

227234
// trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes.
228235
trackCompressedBytes bool
229-
}
230236

231-
// Stop implements fetcher
232-
func (r *concurrentFetchers) Stop() {
233-
close(r.done)
234-
235-
r.wg.Wait()
236-
level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
237+
bufferedFetchedRecords *atomic.Int64
237238
}
238239

239240
// newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset.
@@ -267,18 +268,19 @@ func newConcurrentFetchers(
267268
return nil, fmt.Errorf("resolving offset to start consuming from: %w", err)
268269
}
269270
f := &concurrentFetchers{
270-
client: client,
271-
logger: logger,
272-
topicName: topic,
273-
partitionID: partition,
274-
metrics: metrics,
275-
minBytesWaitTime: minBytesWaitTime,
276-
lastReturnedRecord: startOffset - 1,
277-
startOffsets: startOffsetsReader,
278-
trackCompressedBytes: trackCompressedBytes,
279-
tracer: recordsTracer(),
280-
orderedFetches: make(chan fetchResult),
281-
done: make(chan struct{}),
271+
bufferedFetchedRecords: atomic.NewInt64(0),
272+
client: client,
273+
logger: logger,
274+
topicName: topic,
275+
partitionID: partition,
276+
metrics: metrics,
277+
minBytesWaitTime: minBytesWaitTime,
278+
lastReturnedRecord: startOffset - 1,
279+
startOffsets: startOffsetsReader,
280+
trackCompressedBytes: trackCompressedBytes,
281+
tracer: recordsTracer(),
282+
orderedFetches: make(chan fetchResult),
283+
done: make(chan struct{}),
282284
}
283285

284286
topics, err := kadm.NewClient(client).ListTopics(ctx, topic)
@@ -299,6 +301,32 @@ func newConcurrentFetchers(
299301
return f, nil
300302
}
301303

304+
// BufferedRecords implements fetcher.
305+
func (r *concurrentFetchers) BufferedRecords() int64 {
306+
return r.bufferedFetchedRecords.Load()
307+
}
308+
309+
// Stop implements fetcher.
310+
func (r *concurrentFetchers) Stop() {
311+
// Ensure it's not already stopped.
312+
select {
313+
case _, ok := <-r.done:
314+
if !ok {
315+
return
316+
}
317+
default:
318+
}
319+
320+
close(r.done)
321+
r.wg.Wait()
322+
323+
// When the fetcher is stopped, buffered records are intentionally dropped. For this reason,
324+
// we do reset the counter of buffered records here.
325+
r.bufferedFetchedRecords.Store(0)
326+
327+
level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
328+
}
329+
302330
// Update implements fetcher
303331
func (r *concurrentFetchers) Update(ctx context.Context, concurrency, records int) {
304332
r.Stop()
@@ -315,6 +343,13 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont
315343
case <-ctx.Done():
316344
return kgo.Fetches{}, ctx
317345
case f := <-r.orderedFetches:
346+
// The records have been polled from the buffer, so we can now decrease the number of
347+
// buffered records. It's important to note that we decrease it by the number of actually
348+
// buffered records and not by the number of records returned by PollFetchers(), which
349+
// could be lower if some records are discarded because "old" (already returned by previous
350+
// PollFetches() calls).
351+
r.bufferedFetchedRecords.Sub(int64(len(f.Records)))
352+
318353
firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord)
319354
r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime)
320355

@@ -499,6 +534,10 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg
499534
attemptSpan.SetTag("attempt", attempt)
500535

501536
f := r.fetchSingle(ctx, w)
537+
538+
// We increase the count of buffered records as soon as we fetch them.
539+
r.bufferedFetchedRecords.Add(int64(len(f.Records)))
540+
502541
f = f.Merge(previousResult)
503542
previousResult = f
504543
if f.Err != nil {
@@ -586,18 +625,39 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
586625
go r.run(ctx, wants, logger, highWatermark)
587626
}
588627

628+
// We need to make sure we don't leak any goroutine given that start is called within a goroutine.
629+
defer r.wg.Done()
630+
589631
var (
590-
nextFetch = fetchWantFrom(startOffset, recordsPerFetch)
591-
nextResult chan fetchResult
632+
// nextFetch is the next records fetch operation we want to issue to one of the running workers.
633+
// It contains the offset range to fetch and a channel where the result should be written to.
634+
nextFetch = fetchWantFrom(startOffset, recordsPerFetch)
635+
636+
// nextResult is the channel where we expect a worker will write the result of the next fetch
637+
// operation. This result is the next result that will be returned to PollFetches(), guaranteeing
638+
// records ordering.
639+
nextResult chan fetchResult
640+
641+
// pendingResults is the list of all fetchResult of all inflight fetch operations. Pending results
642+
// are ordered in the same order these results should be returned to PollFetches(), so the first one
643+
// in the list is the next one that should be returned, unless nextResult is valued (if nextResult
644+
// is valued, then nextResult is the next and the first item in the pendingResults list is the
645+
// 2nd next one).
592646
pendingResults = list.New()
593647

594-
bufferedResult fetchResult
595-
readyBufferedResults chan fetchResult // this is non-nil when bufferedResult is non-empty
648+
// bufferedResult is the next fetch that should be polled by PollFetches().
649+
bufferedResult fetchResult
650+
651+
// readyBufferedResults channel gets continuously flipped between nil and the actual channel
652+
// where PollFetches() reads from. This channel is nil when there are no ordered buffered
653+
// records ready to be written to the channel where PollFetches(), and is non-nil when there
654+
// are some ordered buffered records ready.
655+
//
656+
// It is guaranteed that this channel is non-nil when bufferedResult is non-empty.
657+
readyBufferedResults chan fetchResult
596658
)
597659
nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume
598660

599-
// We need to make sure we don't leak any goroutine given that start is called within a goroutine.
600-
defer r.wg.Done()
601661
for {
602662
refillBufferedResult := nextResult
603663
if readyBufferedResults != nil {
@@ -641,6 +701,10 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
641701
continue
642702
}
643703
nextFetch = nextFetch.UpdateBytesPerRecord(result.fetchedBytes, len(result.Records))
704+
705+
// We have some ordered records ready to be sent to PollFetches(). We store the fetch
706+
// result in bufferedResult, and we flip readyBufferedResults to the channel used by
707+
// PollFetches().
644708
bufferedResult = result
645709
readyBufferedResults = r.orderedFetches
646710

0 commit comments

Comments
 (0)