From b92e64eb981d6c2361a271b12f12c34d98454b42 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Fri, 8 Nov 2024 16:56:54 +0000 Subject: [PATCH 01/19] fix `MimirIngesterStuckProcessingRecordsFromKafka` The alert `MimirIngesterStuckProcessingRecordsFromKafka` relied on the metric `cortex_ingest_storage_reader_buffered_fetch_records_total ` provided by the Kafka client to identify wether we had stuck buffers or not. Now that we've implemented concurrent fetching from Kafka and bypass the client's polling function we needed an equivalent metric when using concurrent fetching. This PR does that; In addition to that - the metric also takes the client's buffered records In case we do use a mixture of non-concurrent fetching and concurrent fetching. --- .../alerts.yaml | 3 +- operations/mimir-mixin-compiled/alerts.yaml | 3 +- .../alerts/ingest-storage.libsonnet | 3 +- pkg/storage/ingest/fetcher.go | 57 ++++-- pkg/storage/ingest/reader.go | 26 +++ pkg/storage/ingest/reader_test.go | 173 ++++++++++-------- 6 files changed, 165 insertions(+), 100 deletions(-) diff --git a/operations/mimir-mixin-compiled-baremetal/alerts.yaml b/operations/mimir-mixin-compiled-baremetal/alerts.yaml index 4f87df5ba22..77846f2f2d6 100644 --- a/operations/mimir-mixin-compiled-baremetal/alerts.yaml +++ b/operations/mimir-mixin-compiled-baremetal/alerts.yaml @@ -1103,8 +1103,7 @@ groups: rate(cortex_ingest_storage_reader_requests_total[5m]) ) == 0) and - # NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records. - (sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0) + (sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetched_records) > 0) for: 5m labels: severity: critical diff --git a/operations/mimir-mixin-compiled/alerts.yaml b/operations/mimir-mixin-compiled/alerts.yaml index dc7cdd4e8eb..cef80213c98 100644 --- a/operations/mimir-mixin-compiled/alerts.yaml +++ b/operations/mimir-mixin-compiled/alerts.yaml @@ -1117,8 +1117,7 @@ groups: rate(cortex_ingest_storage_reader_requests_total[5m]) ) == 0) and - # NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records. - (sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0) + (sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0) for: 5m labels: severity: critical diff --git a/operations/mimir-mixin/alerts/ingest-storage.libsonnet b/operations/mimir-mixin/alerts/ingest-storage.libsonnet index 371865130b0..3cca0186d4b 100644 --- a/operations/mimir-mixin/alerts/ingest-storage.libsonnet +++ b/operations/mimir-mixin/alerts/ingest-storage.libsonnet @@ -151,8 +151,7 @@ rate(cortex_ingest_storage_reader_requests_total[5m]) ) == 0) and - # NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records. - (sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0) + (sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetched_records) > 0) ||| % $._config, labels: { severity: 'critical', diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 05bffcd40c5..15a4546328f 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -48,6 +48,9 @@ type fetcher interface { // Stop stops the fetcher. Stop() + + // BufferedRecords returns the number of records that have been fetched but not yet consumed. + BufferedRecords() float64 } // fetchWant represents a range of offsets to fetch. @@ -226,14 +229,8 @@ type concurrentFetchers struct { // trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes. trackCompressedBytes bool -} - -// Stop implements fetcher -func (r *concurrentFetchers) Stop() { - close(r.done) - r.wg.Wait() - level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) + bufferedFetchedRecords *atomic.Int64 } // newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset. @@ -267,18 +264,19 @@ func newConcurrentFetchers( return nil, fmt.Errorf("resolving offset to start consuming from: %w", err) } f := &concurrentFetchers{ - client: client, - logger: logger, - topicName: topic, - partitionID: partition, - metrics: metrics, - minBytesWaitTime: minBytesWaitTime, - lastReturnedRecord: startOffset - 1, - startOffsets: startOffsetsReader, - trackCompressedBytes: trackCompressedBytes, - tracer: recordsTracer(), - orderedFetches: make(chan fetchResult), - done: make(chan struct{}), + bufferedFetchedRecords: atomic.NewInt64(0), + client: client, + logger: logger, + topicName: topic, + partitionID: partition, + metrics: metrics, + minBytesWaitTime: minBytesWaitTime, + lastReturnedRecord: startOffset - 1, + startOffsets: startOffsetsReader, + trackCompressedBytes: trackCompressedBytes, + tracer: recordsTracer(), + orderedFetches: make(chan fetchResult), + done: make(chan struct{}), } topics, err := kadm.NewClient(client).ListTopics(ctx, topic) @@ -299,6 +297,20 @@ func newConcurrentFetchers( return f, nil } +// BufferedRecords implements fetcher. +func (r *concurrentFetchers) BufferedRecords() float64 { + return float64(r.bufferedFetchedRecords.Load()) +} + +// Stop implements fetcher. +func (r *concurrentFetchers) Stop() { + close(r.done) + + r.wg.Wait() + + level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) +} + // Update implements fetcher func (r *concurrentFetchers) Update(ctx context.Context, concurrency, records int) { r.Stop() @@ -315,6 +327,10 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont case <-ctx.Done(): return kgo.Fetches{}, ctx case f := <-r.orderedFetches: + // At this point, we're guaranteed that the records are not going to be with us anymore. + tr := r.bufferedFetchedRecords.Sub(int64(len(f.FetchPartition.Records))) + level.Debug(r.logger).Log("msg", "remove buffered fetched records", "num_records", len(f.FetchPartition.Records), "total_records", tr) + firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord) r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime) @@ -446,6 +462,9 @@ func (r *concurrentFetchers) parseFetchResponse(ctx context.Context, startOffset fetchedBytes = sumRecordLengths(partition.Records) } + nv := r.bufferedFetchedRecords.Add(int64(len(partition.Records))) + level.Info(r.logger).Log("msg", "buffered fetched records", "num_records", len(partition.Records), "total_records", nv) + return fetchResult{ ctx: ctx, FetchPartition: partition, diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index f7000f5fbe1..5170c00239b 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -117,6 +117,8 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri reg: reg, } + r.metrics.RegisterBuffedRecordsMetric(func() float64 { return r.BufferedRecords() }) + r.Service = services.NewBasicService(r.start, r.run, r.stop) return r, nil } @@ -131,6 +133,19 @@ func (r *PartitionReader) Update(_ context.Context, _, _ int) { // Given the partition reader has no concurrency it doesn't support updates. } +func (r *PartitionReader) BufferedRecords() float64 { + var fcount, ccount float64 + if r.fetcher != nil && r.fetcher != r { + fcount = r.fetcher.BufferedRecords() + } + + if r.client != nil { + ccount = float64(r.client.BufferedFetchRecords()) + } + + return fcount + ccount +} + func (r *PartitionReader) start(ctx context.Context) (returnErr error) { if r.kafkaCfg.AutoCreateTopicEnabled { setDefaultNumberOfPartitionsForAutocreatedTopics(r.kafkaCfg, r.logger) @@ -937,6 +952,9 @@ func (r *partitionCommitter) stop(error) error { } type readerMetrics struct { + reg prometheus.Registerer + + bufferedFetchedRecords prometheus.GaugeFunc receiveDelayWhenStarting prometheus.Observer receiveDelayWhenRunning prometheus.Observer recordsPerFetch prometheus.Histogram @@ -973,6 +991,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric lastConsumedOffset.Set(-1) return readerMetrics{ + reg: reg, receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"), receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"), recordsPerFetch: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ @@ -1008,6 +1027,13 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric } } +func (r readerMetrics) RegisterBuffedRecordsMetric(collector func() float64) { + r.bufferedFetchedRecords = promauto.With(r.reg).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_ingest_storage_reader_buffered_fetched_records", + Help: "The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed.", + }, collector) +} + type StrongReadConsistencyInstrumentation[T any] struct { requests *prometheus.CounterVec failures prometheus.Counter diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 29aad7b5b5c..4c8252b525a 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -1793,74 +1793,87 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) } -func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenConcurrentFetchIsEnabled(t *testing.T) { +func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testing.T) { const ( topicName = "test" partitionID = 1 ) - var ( - ctx = context.Background() - _, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName) - consumedRecordsMx sync.Mutex - consumedRecords []string - ) + concurrencyVariants := map[string][]readerTestCfgOpt{ + "without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)}, + "with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)}, + "with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)}, + "with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)}, + } - consumer := consumerFunc(func(_ context.Context, records []record) error { - consumedRecordsMx.Lock() - defer consumedRecordsMx.Unlock() + for concurrencyName, concurrencyVariant := range concurrencyVariants { + concurrencyVariant := concurrencyVariant - for _, r := range records { - consumedRecords = append(consumedRecords, string(r.content)) - } - return nil - }) + t.Run(concurrencyName, func(t *testing.T) { + t.Parallel() - // Produce some records. - writeClient := newKafkaProduceClient(t, clusterAddr) - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-2")) - t.Log("produced 2 records") - - // Create and start the reader. - reg := prometheus.NewPedanticRegistry() - logs := &concurrency.SyncBuffer{} - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, - withConsumeFromPositionAtStartup(consumeFromStart), - withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), - withRegistry(reg), - withLogger(log.NewLogfmtLogger(logs)), - // Enable both startup and ongoing fetch concurrency. - withStartupConcurrency(2), - withOngoingConcurrency(2)) - - require.NoError(t, reader.StartAsync(ctx)) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) - }) + var ( + ctx = context.Background() + _, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName) + consumedRecordsMx sync.Mutex + consumedRecords []string + ) - // We expect the reader to catch up, and then switch to Running state. - test.Poll(t, 5*time.Second, services.Running, func() interface{} { - return reader.State() - }) + consumer := consumerFunc(func(_ context.Context, records []record) error { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() - // We expect the reader to have switched to running because target consumer lag has been honored. - assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + for _, r := range records { + consumedRecords = append(consumedRecords, string(r.content)) + } + return nil + }) - // We expect the reader to have consumed the partition from start. - test.Poll(t, time.Second, []string{"record-1", "record-2"}, func() interface{} { - consumedRecordsMx.Lock() - defer consumedRecordsMx.Unlock() - return slices.Clone(consumedRecords) - }) + // Produce some records. + writeClient := newKafkaProduceClient(t, clusterAddr) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-2")) + t.Log("produced 2 records") - // Wait some time to give some time for the Kafka client to eventually read and buffer records. - // We don't expect it, but to make sure it's not happening we have to give it some time. - time.Sleep(time.Second) + // Create and start the reader. + reg := prometheus.NewPedanticRegistry() + logs := &concurrency.SyncBuffer{} - // We expect the last consumed offset to be tracked in a metric, and there are no buffered records reported. - test.Poll(t, time.Second, nil, func() interface{} { - return promtest.GatherAndCompare(reg, strings.NewReader(` + readerOpts := append([]readerTestCfgOpt{ + withConsumeFromPositionAtStartup(consumeFromStart), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs)), + }, concurrencyVariant...) + + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, readerOpts...) + require.NoError(t, reader.StartAsync(ctx)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) + }) + + // We expect the reader to catch up, and then switch to Running state. + test.Poll(t, 5*time.Second, services.Running, func() interface{} { + return reader.State() + }) + + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + + // We expect the reader to have consumed the partition from start. + test.Poll(t, time.Second, []string{"record-1", "record-2"}, func() interface{} { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + return slices.Clone(consumedRecords) + }) + + // Wait some time to give some time for the Kafka client to eventually read and buffer records. + // We don't expect it, but to make sure it's not happening we have to give it some time. + time.Sleep(time.Second) + + // We expect the last consumed offset to be tracked in a metric, and there are no buffered records reported. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 1 @@ -1868,28 +1881,32 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenConcurrentFet # HELP cortex_ingest_storage_reader_buffered_fetch_records_total Total number of records buffered within the client ready to be consumed # TYPE cortex_ingest_storage_reader_buffered_fetch_records_total gauge cortex_ingest_storage_reader_buffered_fetch_records_total{component="partition-reader"} 0 - `), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total") - }) - // Produce more records after the reader has started. - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-3")) - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-4")) - t.Log("produced 2 records") + # HELP cortex_ingest_storage_reader_buffered_fetched_records The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed. + # TYPE cortex_ingest_storage_reader_buffered_fetched_records gauge + cortex_ingest_storage_reader_buffered_fetched_records 0 + `), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total", "cortex_ingest_storage_reader_buffered_fetched_records") + }) - // We expect the reader to consume subsequent records too. - test.Poll(t, time.Second, []string{"record-1", "record-2", "record-3", "record-4"}, func() interface{} { - consumedRecordsMx.Lock() - defer consumedRecordsMx.Unlock() - return slices.Clone(consumedRecords) - }) + // Produce more records after the reader has started. + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-3")) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-4")) + t.Log("produced 2 records") + + // We expect the reader to consume subsequent records too. + test.Poll(t, time.Second, []string{"record-1", "record-2", "record-3", "record-4"}, func() interface{} { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + return slices.Clone(consumedRecords) + }) - // Wait some time to give some time for the Kafka client to eventually read and buffer records. - // We don't expect it, but to make sure it's not happening we have to give it some time. - time.Sleep(time.Second) + // Wait some time to give some time for the Kafka client to eventually read and buffer records. + // We don't expect it, but to make sure it's not happening we have to give it some time. + time.Sleep(time.Second) - // We expect the last consumed offset to be tracked in a metric, and there are no buffered records reported. - test.Poll(t, time.Second, nil, func() interface{} { - return promtest.GatherAndCompare(reg, strings.NewReader(` + // We expect the last consumed offset to be tracked in a metric, and there are no buffered records reported. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 3 @@ -1897,8 +1914,14 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenConcurrentFet # HELP cortex_ingest_storage_reader_buffered_fetch_records_total Total number of records buffered within the client ready to be consumed # TYPE cortex_ingest_storage_reader_buffered_fetch_records_total gauge cortex_ingest_storage_reader_buffered_fetch_records_total{component="partition-reader"} 0 - `), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total") - }) + + # HELP cortex_ingest_storage_reader_buffered_fetched_records The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed. + # TYPE cortex_ingest_storage_reader_buffered_fetched_records gauge + cortex_ingest_storage_reader_buffered_fetched_records 0 + `), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total", "cortex_ingest_storage_reader_buffered_fetched_records") + }) + }) + } } func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) { From e067cea74aff9c20f6258975da487e73fb2d97c9 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Fri, 8 Nov 2024 16:58:37 +0000 Subject: [PATCH 02/19] Add changelog Signed-off-by: gotjosh --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0cd4e57bca..e618bf055ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ ### Mixin * [CHANGE] Remove backwards compatibility for `thanos_memcached_` prefixed metrics in dashboards and alerts removed in 2.12. #9674 #9758 +* [CHANGE] Reworked the alert `MimirIngesterStuckProcessingRecordsFromKafka` to also work when concurrent fetching is enabled. #9855 * [ENHANCEMENT] Unify ingester autoscaling panels on 'Mimir / Writes' dashboard to work for both ingest-storage and non-ingest-storage autoscaling. #9617 * [ENHANCEMENT] Alerts: Enable configuring job prefix for alerts to prevent clashes with metrics from Loki/Tempo. #9659 * [ENHANCEMENT] Dashboards: visualize the age of source blocks in the "Mimir / Compactor" dashboard. #9697 From 006fb419c0255b3a93404fc1a0c1ecd92b523a41 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Fri, 8 Nov 2024 17:13:36 +0000 Subject: [PATCH 03/19] reestrcture metric assignment Signed-off-by: gotjosh --- pkg/storage/ingest/fetcher_test.go | 2 +- pkg/storage/ingest/reader.go | 17 ++++++----------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index 89b85ffc68c..c20040a5265 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -960,7 +960,7 @@ func TestConcurrentFetchers(t *testing.T) { func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency, recordsPerFetch int) *concurrentFetchers { logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() - metrics := newReaderMetrics(partition, reg) + metrics := newReaderMetrics(partition, reg, func() float64 { return 1 }) // This instantiates the fields of kprom. // This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves. diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 5170c00239b..d64d738bd71 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -110,14 +110,13 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri partitionID: partitionID, newConsumer: consumer, consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID), - metrics: newReaderMetrics(partitionID, reg), consumedOffsetWatcher: newPartitionOffsetWatcher(), concurrentFetchersMinBytesMaxWaitTime: defaultMinBytesMaxWaitTime, logger: log.With(logger, "partition", partitionID), reg: reg, } - r.metrics.RegisterBuffedRecordsMetric(func() float64 { return r.BufferedRecords() }) + r.metrics = newReaderMetrics(partitionID, reg, func() float64 { return r.BufferedRecords() }) r.Service = services.NewBasicService(r.start, r.run, r.stop) return r, nil @@ -968,7 +967,7 @@ type readerMetrics struct { kprom *kprom.Metrics } -func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetrics { +func newReaderMetrics(partitionID int32, reg prometheus.Registerer, bufferedRecordsCollector func() float64) readerMetrics { const component = "partition-reader" receiveDelay := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -991,7 +990,10 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric lastConsumedOffset.Set(-1) return readerMetrics{ - reg: reg, + bufferedFetchedRecords: promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_ingest_storage_reader_buffered_fetched_records", + Help: "The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed.", + }, bufferedRecordsCollector), receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"), receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"), recordsPerFetch: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ @@ -1027,13 +1029,6 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric } } -func (r readerMetrics) RegisterBuffedRecordsMetric(collector func() float64) { - r.bufferedFetchedRecords = promauto.With(r.reg).NewGaugeFunc(prometheus.GaugeOpts{ - Name: "cortex_ingest_storage_reader_buffered_fetched_records", - Help: "The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed.", - }, collector) -} - type StrongReadConsistencyInstrumentation[T any] struct { requests *prometheus.CounterVec failures prometheus.Counter From 0f10a5baee6b122e4eacdbb881152bacf1f37acc Mon Sep 17 00:00:00 2001 From: gotjosh Date: Fri, 8 Nov 2024 17:29:40 +0000 Subject: [PATCH 04/19] Remove the registry Signed-off-by: gotjosh --- pkg/storage/ingest/reader.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index d64d738bd71..d05edb4805b 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -951,8 +951,6 @@ func (r *partitionCommitter) stop(error) error { } type readerMetrics struct { - reg prometheus.Registerer - bufferedFetchedRecords prometheus.GaugeFunc receiveDelayWhenStarting prometheus.Observer receiveDelayWhenRunning prometheus.Observer From ca29785e09d3e2f0bd2c5dae500bf7eba3a7a2dd Mon Sep 17 00:00:00 2001 From: gotjosh Date: Fri, 8 Nov 2024 17:37:55 +0000 Subject: [PATCH 05/19] Fix helm Signed-off-by: gotjosh --- .../templates/metamonitoring/mixin-alerts.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml index b45db52d7ef..ebb3c5fd135 100644 --- a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml +++ b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml @@ -1129,8 +1129,7 @@ spec: rate(cortex_ingest_storage_reader_requests_total[5m]) ) == 0) and - # NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records. - (sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0) + (sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0) for: 5m labels: severity: critical From 92efc704e52bb146b8eea4ee6c025e6bc8ac146b Mon Sep 17 00:00:00 2001 From: gotjosh Date: Fri, 8 Nov 2024 17:56:18 +0000 Subject: [PATCH 06/19] Protected the fetchers Signed-off-by: gotjosh --- pkg/storage/ingest/reader.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index d05edb4805b..2471bde257a 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "strconv" + "sync" "time" "github.com/go-kit/log" @@ -79,8 +80,10 @@ type PartitionReader struct { consumerGroup string concurrentFetchersMinBytesMaxWaitTime time.Duration - client *kgo.Client - fetcher fetcher + client *kgo.Client + + fetcherMtx sync.Mutex + fetcher fetcher newConsumer consumerFactory metrics readerMetrics @@ -133,6 +136,8 @@ func (r *PartitionReader) Update(_ context.Context, _, _ int) { } func (r *PartitionReader) BufferedRecords() float64 { + r.fetcherMtx.Lock() + defer r.fetcherMtx.Unlock() var fcount, ccount float64 if r.fetcher != nil && r.fetcher != r { fcount = r.fetcher.BufferedRecords() @@ -219,7 +224,9 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { if err != nil { return errors.Wrap(err, "creating concurrent fetchers during startup") } + r.fetcherMtx.Lock() r.fetcher = f + r.fetcherMtx.Unlock() } else { // When concurrent fetch is disabled we read records directly from the Kafka client, so we want it // to consume the partition. @@ -227,7 +234,9 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)}, }) + r.fetcherMtx.Lock() r.fetcher = r + r.fetcherMtx.Unlock() } // Enforce the max consumer lag (if enabled). @@ -285,6 +294,9 @@ func (r *PartitionReader) run(ctx context.Context) error { // switchToOngoingFetcher switches to the configured ongoing fetcher. This function could be // called multiple times. func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { + r.fetcherMtx.Lock() + defer r.fetcherMtx.Unlock() + if r.kafkaCfg.StartupFetchConcurrency == r.kafkaCfg.OngoingFetchConcurrency && r.kafkaCfg.StartupRecordsPerFetch == r.kafkaCfg.OngoingRecordsPerFetch { // we're already using the same settings, no need to switch return From 693e73df75bd9f26721acbccd39cc19d4cbcf30f Mon Sep 17 00:00:00 2001 From: gotjosh Date: Sun, 10 Nov 2024 22:23:42 +0000 Subject: [PATCH 07/19] Change log to debug Signed-off-by: gotjosh --- pkg/storage/ingest/fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 15a4546328f..097dc0b4069 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -463,7 +463,7 @@ func (r *concurrentFetchers) parseFetchResponse(ctx context.Context, startOffset } nv := r.bufferedFetchedRecords.Add(int64(len(partition.Records))) - level.Info(r.logger).Log("msg", "buffered fetched records", "num_records", len(partition.Records), "total_records", nv) + level.Debug(r.logger).Log("msg", "buffered fetched records", "num_records", len(partition.Records), "total_records", nv) return fetchResult{ ctx: ctx, From f05f02e5ff883746c4be97100a00377750c32bbb Mon Sep 17 00:00:00 2001 From: gotjosh Date: Mon, 11 Nov 2024 09:05:41 +0000 Subject: [PATCH 08/19] make `BufferedRecords` int64 and remove debug logs Signed-off-by: gotjosh --- pkg/storage/ingest/fetcher.go | 14 +++++--------- pkg/storage/ingest/reader.go | 8 ++++---- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 097dc0b4069..5289d1c2544 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -50,7 +50,7 @@ type fetcher interface { Stop() // BufferedRecords returns the number of records that have been fetched but not yet consumed. - BufferedRecords() float64 + BufferedRecords() int64 } // fetchWant represents a range of offsets to fetch. @@ -298,8 +298,8 @@ func newConcurrentFetchers( } // BufferedRecords implements fetcher. -func (r *concurrentFetchers) BufferedRecords() float64 { - return float64(r.bufferedFetchedRecords.Load()) +func (r *concurrentFetchers) BufferedRecords() int64 { + return r.bufferedFetchedRecords.Load() } // Stop implements fetcher. @@ -328,9 +328,7 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont return kgo.Fetches{}, ctx case f := <-r.orderedFetches: // At this point, we're guaranteed that the records are not going to be with us anymore. - tr := r.bufferedFetchedRecords.Sub(int64(len(f.FetchPartition.Records))) - level.Debug(r.logger).Log("msg", "remove buffered fetched records", "num_records", len(f.FetchPartition.Records), "total_records", tr) - + r.bufferedFetchedRecords.Sub(int64(len(f.FetchPartition.Records))) firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord) r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime) @@ -462,9 +460,7 @@ func (r *concurrentFetchers) parseFetchResponse(ctx context.Context, startOffset fetchedBytes = sumRecordLengths(partition.Records) } - nv := r.bufferedFetchedRecords.Add(int64(len(partition.Records))) - level.Debug(r.logger).Log("msg", "buffered fetched records", "num_records", len(partition.Records), "total_records", nv) - + r.bufferedFetchedRecords.Add(int64(len(partition.Records))) return fetchResult{ ctx: ctx, FetchPartition: partition, diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 2471bde257a..08941746203 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -119,7 +119,7 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri reg: reg, } - r.metrics = newReaderMetrics(partitionID, reg, func() float64 { return r.BufferedRecords() }) + r.metrics = newReaderMetrics(partitionID, reg, func() float64 { return float64(r.BufferedRecords()) }) r.Service = services.NewBasicService(r.start, r.run, r.stop) return r, nil @@ -135,16 +135,16 @@ func (r *PartitionReader) Update(_ context.Context, _, _ int) { // Given the partition reader has no concurrency it doesn't support updates. } -func (r *PartitionReader) BufferedRecords() float64 { +func (r *PartitionReader) BufferedRecords() int64 { r.fetcherMtx.Lock() defer r.fetcherMtx.Unlock() - var fcount, ccount float64 + var fcount, ccount int64 if r.fetcher != nil && r.fetcher != r { fcount = r.fetcher.BufferedRecords() } if r.client != nil { - ccount = float64(r.client.BufferedFetchRecords()) + ccount = r.client.BufferedFetchRecords() } return fcount + ccount From 41c83a11e7ac18b202487c52313eede2caf28c48 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Mon, 11 Nov 2024 09:10:06 +0000 Subject: [PATCH 09/19] Move buffered records increment location Signed-off-by: gotjosh --- pkg/storage/ingest/fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 5289d1c2544..1aa9e4b8f06 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -460,7 +460,6 @@ func (r *concurrentFetchers) parseFetchResponse(ctx context.Context, startOffset fetchedBytes = sumRecordLengths(partition.Records) } - r.bufferedFetchedRecords.Add(int64(len(partition.Records))) return fetchResult{ ctx: ctx, FetchPartition: partition, @@ -514,6 +513,7 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg attemptSpan.SetTag("attempt", attempt) f := r.fetchSingle(ctx, w) + r.bufferedFetchedRecords.Add(int64(len(f.FetchPartition.Records))) f = f.Merge(previousResult) previousResult = f if f.Err != nil { From ef663005e4d46f2c7d493a4fd421d87e877fa993 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Mon, 11 Nov 2024 09:46:34 +0000 Subject: [PATCH 10/19] Use atomic functions for locking / unlocking the client and fetcher. Signed-off-by: gotjosh --- pkg/storage/ingest/reader.go | 101 +++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 40 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 08941746203..ca8ef6149b9 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -80,10 +80,10 @@ type PartitionReader struct { consumerGroup string concurrentFetchersMinBytesMaxWaitTime time.Duration - client *kgo.Client - - fetcherMtx sync.Mutex - fetcher fetcher + // fetchingMtx protects both the client and the fetcher but never at the same time. + fetchingMtx sync.Mutex + client *kgo.Client + fetcher fetcher newConsumer consumerFactory metrics readerMetrics @@ -136,15 +136,15 @@ func (r *PartitionReader) Update(_ context.Context, _, _ int) { } func (r *PartitionReader) BufferedRecords() int64 { - r.fetcherMtx.Lock() - defer r.fetcherMtx.Unlock() + f := r.getFetcher() var fcount, ccount int64 - if r.fetcher != nil && r.fetcher != r { - fcount = r.fetcher.BufferedRecords() + if f != nil && f != r { + fcount = f.BufferedRecords() } - if r.client != nil { - ccount = r.client.BufferedFetchRecords() + c := r.getClient() + if c != nil { + ccount = c.BufferedFetchRecords() } return fcount + ccount @@ -173,14 +173,15 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { } // Create a Kafka client without configuring any partition to consume (it will be done later). - r.client, err = NewKafkaReaderClient(r.kafkaCfg, r.metrics.kprom, r.logger) + client, err := NewKafkaReaderClient(r.kafkaCfg, r.metrics.kprom, r.logger) if err != nil { return errors.Wrap(err, "creating kafka reader client") } + r.setClient(client) - r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.logger, r.reg) + r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.getClient()), r.partitionID, r.consumerGroup, r.logger, r.reg) - offsetsClient := newPartitionOffsetClient(r.client, r.kafkaCfg.Topic, r.reg, r.logger) + offsetsClient := newPartitionOffsetClient(r.getClient(), r.kafkaCfg.Topic, r.reg, r.logger) // It's ok to have the start offset slightly outdated. // We only need this offset accurate if we fall behind or if we start and the log gets truncated from beneath us. @@ -213,30 +214,26 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { // // To make it happen, we do pause the fetching first and then we configure consumption. The consumption // will be kept paused until the explicit ResumeFetchPartitions() is called. - r.client.PauseFetchPartitions(map[string][]int32{ + r.getClient().PauseFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{ r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)}, }) - f, err := newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) + f, err := newConcurrentFetchers(ctx, r.getClient(), r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) if err != nil { return errors.Wrap(err, "creating concurrent fetchers during startup") } - r.fetcherMtx.Lock() - r.fetcher = f - r.fetcherMtx.Unlock() + r.setFetcher(f) } else { // When concurrent fetch is disabled we read records directly from the Kafka client, so we want it // to consume the partition. - r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{ r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)}, }) - r.fetcherMtx.Lock() - r.fetcher = r - r.fetcherMtx.Unlock() + r.setFetcher(r) } // Enforce the max consumer lag (if enabled). @@ -266,12 +263,14 @@ func (r *PartitionReader) stopDependencies() error { } } - if r.fetcher != nil { - r.fetcher.Stop() + f := r.getFetcher() + if f != nil { + f.Stop() } - if r.client != nil { - r.client.Close() + c := r.getClient() + if c != nil { + c.Close() } return nil @@ -294,22 +293,20 @@ func (r *PartitionReader) run(ctx context.Context) error { // switchToOngoingFetcher switches to the configured ongoing fetcher. This function could be // called multiple times. func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { - r.fetcherMtx.Lock() - defer r.fetcherMtx.Unlock() - if r.kafkaCfg.StartupFetchConcurrency == r.kafkaCfg.OngoingFetchConcurrency && r.kafkaCfg.StartupRecordsPerFetch == r.kafkaCfg.OngoingRecordsPerFetch { // we're already using the same settings, no need to switch return } if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency > 0 { + // No need to switch the fetcher, just update the records per fetch. - r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) + r.getFetcher().Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) return } if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency == 0 { - if r.fetcher == r { + if r.getFetcher() == r { // This method has been called before, no need to switch the fetcher. return } @@ -317,11 +314,11 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { level.Info(r.logger).Log("msg", "partition reader is switching to non-concurrent fetcher") // Stop the current fetcher before replacing it. - r.fetcher.Stop() + r.getFetcher().Stop() // We need to switch to franz-go for ongoing fetches. // If we've already fetched some records, we should discard them from franz-go and start from the last consumed offset. - r.fetcher = r + r.setFetcher(r) lastConsumed := r.consumedOffsetWatcher.LastConsumedOffset() if lastConsumed == -1 { @@ -329,7 +326,7 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { // // The franz-go client is initialized to start consuming from the same place as the other fetcher. // We can just use the client, but we have to resume the fetching because it was previously paused. - r.client.ResumeFetchPartitions(map[string][]int32{ + r.getClient().ResumeFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) return @@ -339,13 +336,13 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { // from a clean setup and have the guarantee that we're not going to read any previously buffered record, // we do remove the partition consumption (this clears the buffer), then we resume the fetching and finally // we add the consumption back. - r.client.RemoveConsumePartitions(map[string][]int32{ + r.getClient().RemoveConsumePartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.client.ResumeFetchPartitions(map[string][]int32{ + r.getClient().ResumeFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{ // Resume from the next unconsumed offset. r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(lastConsumed + 1)}, }) @@ -353,7 +350,7 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { } func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver prometheus.Observer) error { - fetches, fetchCtx := r.fetcher.PollFetches(ctx) + fetches, fetchCtx := r.getFetcher().PollFetches(ctx) // Propagate the fetching span to consuming the records. ctx = opentracing.ContextWithSpan(ctx, opentracing.SpanFromContext(fetchCtx)) r.recordFetchesMetrics(fetches, delayObserver) @@ -825,11 +822,35 @@ func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bo return err } +func (r *PartitionReader) setClient(c *kgo.Client) { + r.fetchingMtx.Lock() + defer r.fetchingMtx.Unlock() + r.client = c +} + +func (r *PartitionReader) getClient() *kgo.Client { + r.fetchingMtx.Lock() + defer r.fetchingMtx.Unlock() + return r.client +} + +func (r *PartitionReader) setFetcher(f fetcher) { + r.fetchingMtx.Lock() + defer r.fetchingMtx.Unlock() + r.fetcher = f +} + +func (r *PartitionReader) getFetcher() fetcher { + r.fetchingMtx.Lock() + defer r.fetchingMtx.Unlock() + return r.fetcher +} + func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, fetchContext context.Context) { defer func(start time.Time) { r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) }(time.Now()) - return r.client.PollFetches(ctx), ctx + return r.getClient().PollFetches(ctx), ctx } type partitionCommitter struct { From 4780813ce8c9073bdb881cc992471dc65f2b9996 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Mon, 11 Nov 2024 17:23:35 +0000 Subject: [PATCH 11/19] assert on buffered records Signed-off-by: gotjosh --- pkg/storage/ingest/reader_test.go | 80 +++++++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 9 deletions(-) diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 4c8252b525a..587b47c736f 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -1799,15 +1799,35 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testi partitionID = 1 ) - concurrencyVariants := map[string][]readerTestCfgOpt{ - "without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)}, - "with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)}, - "with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)}, - "with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)}, + tc := map[string]struct { + concurrencyVariant []readerTestCfgOpt + expectedBufferedRecords int + expectedBufferedRecordsFromClient int + }{ + "without concurrency": { + concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(0), withOngoingConcurrency(0)}, + expectedBufferedRecords: 1, + expectedBufferedRecordsFromClient: 1, + }, + "with startup concurrency": { + concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(2), withOngoingConcurrency(0)}, + expectedBufferedRecords: 1, + expectedBufferedRecordsFromClient: 1, + }, + "with startup and ongoing concurrency": { + concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(2), withOngoingConcurrency(2)}, + expectedBufferedRecords: 1, + expectedBufferedRecordsFromClient: 0, + }, + "with startup and ongoing concurrency (different settings)": { + concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(2), withOngoingConcurrency(4)}, + expectedBufferedRecords: 1, + expectedBufferedRecordsFromClient: 0, + }, } - for concurrencyName, concurrencyVariant := range concurrencyVariants { - concurrencyVariant := concurrencyVariant + for concurrencyName, tt := range tc { + concurrencyVariant := tt.concurrencyVariant t.Run(concurrencyName, func(t *testing.T) { t.Parallel() @@ -1817,12 +1837,30 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testi _, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName) consumedRecordsMx sync.Mutex consumedRecords []string + blocked = atomic.NewBool(false) ) consumer := consumerFunc(func(_ context.Context, records []record) error { + if blocked.Load() { + blockedTicker := time.NewTicker(100 * time.Millisecond) + defer blockedTicker.Stop() + outer: + for { + select { + case <-blockedTicker.C: + if !blocked.Load() { + break outer + } + case <-time.After(3 * time.Second): + // This is basically a test failure as we never finish the test in time. + t.Log("failed to finish unblocking the consumer in time") + return nil + } + } + } + consumedRecordsMx.Lock() defer consumedRecordsMx.Unlock() - for _, r := range records { consumedRecords = append(consumedRecords, string(r.content)) } @@ -1888,11 +1926,35 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testi `), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total", "cortex_ingest_storage_reader_buffered_fetched_records") }) - // Produce more records after the reader has started. + // Now, we want to assert that when the reader does have records buffered the metrics correctly reflect the current state. + // First, make the consumer block on the next consumption. + blocked.Store(true) + + // Now, produce more records after the reader has started. produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-3")) produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-4")) t.Log("produced 2 records") + // Now, we expect to have some records buffered. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` + # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. + # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge + cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 1 + + # HELP cortex_ingest_storage_reader_buffered_fetch_records_total Total number of records buffered within the client ready to be consumed + # TYPE cortex_ingest_storage_reader_buffered_fetch_records_total gauge + cortex_ingest_storage_reader_buffered_fetch_records_total{component="partition-reader"} %d + + # HELP cortex_ingest_storage_reader_buffered_fetched_records The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed. + # TYPE cortex_ingest_storage_reader_buffered_fetched_records gauge + cortex_ingest_storage_reader_buffered_fetched_records %d + `, tt.expectedBufferedRecordsFromClient, tt.expectedBufferedRecords)), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total", "cortex_ingest_storage_reader_buffered_fetched_records") + }) + + // With that assertion done, we can unblock records consumption. + blocked.Store(false) + // We expect the reader to consume subsequent records too. test.Poll(t, time.Second, []string{"record-1", "record-2", "record-3", "record-4"}, func() interface{} { consumedRecordsMx.Lock() From 2cc97aa8f37b5a63085c7a115c5b123673b8c89d Mon Sep 17 00:00:00 2001 From: gotjosh Date: Tue, 12 Nov 2024 21:36:38 -0700 Subject: [PATCH 12/19] reset the records buffer when `stop()` is called. Signed-off-by: gotjosh --- pkg/storage/ingest/fetcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 1aa9e4b8f06..d9c4b12e22e 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -307,6 +307,7 @@ func (r *concurrentFetchers) Stop() { close(r.done) r.wg.Wait() + r.bufferedFetchedRecords.Store(0) level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) } From 6e166d7112b08977eeb5d4e6c4aed6a729d2d9d0 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2024 09:54:26 +0100 Subject: [PATCH 13/19] Fix test Signed-off-by: Marco Pracucci --- pkg/storage/ingest/fetcher_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index c20040a5265..5eda6a258e1 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -923,7 +923,9 @@ func TestConcurrentFetchers(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() - metrics := newReaderMetrics(partitionID, reg) + metrics := newReaderMetrics(partitionID, reg, func() float64 { + return 0 + }) client := newKafkaProduceClient(t, clusterAddr) From 97bb2a965f27e3eefa6e36a17bc0c19f785128c7 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2024 12:07:14 +0100 Subject: [PATCH 14/19] Changed how buffered records are tracked, improved unit tests and used atomic instead of a mutex to protect client/fetcher access Signed-off-by: Marco Pracucci --- pkg/storage/ingest/fetcher.go | 56 ++++++++-- pkg/storage/ingest/fetcher_test.go | 160 +++++++++++++++++++++++++---- pkg/storage/ingest/reader.go | 75 +++++--------- 3 files changed, 214 insertions(+), 77 deletions(-) diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index d9c4b12e22e..06d62f0a375 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -223,7 +223,12 @@ type concurrentFetchers struct { minBytesWaitTime time.Duration - orderedFetches chan fetchResult + // orderedFetches is a channel where we write fetches that are ready to be polled by PollFetches(). + // Since all records must be polled in order, the fetches written to this channel are after + // ordering and "deduplication" (in case the same record is fetched multiple times from different + // routines). + orderedFetches chan fetchResult + lastReturnedRecord int64 startOffsets *genericOffsetReader[int64] @@ -304,10 +309,17 @@ func (r *concurrentFetchers) BufferedRecords() int64 { // Stop implements fetcher. func (r *concurrentFetchers) Stop() { - close(r.done) + // Ensure it's not already stopped. + select { + case _, ok := <-r.done: + if !ok { + return + } + default: + } + close(r.done) r.wg.Wait() - r.bufferedFetchedRecords.Store(0) level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) } @@ -328,8 +340,6 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont case <-ctx.Done(): return kgo.Fetches{}, ctx case f := <-r.orderedFetches: - // At this point, we're guaranteed that the records are not going to be with us anymore. - r.bufferedFetchedRecords.Sub(int64(len(f.FetchPartition.Records))) firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord) r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime) @@ -514,7 +524,6 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg attemptSpan.SetTag("attempt", attempt) f := r.fetchSingle(ctx, w) - r.bufferedFetchedRecords.Add(int64(len(f.FetchPartition.Records))) f = f.Merge(previousResult) previousResult = f if f.Err != nil { @@ -602,18 +611,31 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu go r.run(ctx, wants, logger, highWatermark) } + // We need to make sure we don't leak any goroutine given that start is called within a goroutine. + defer r.wg.Done() + var ( nextFetch = fetchWantFrom(startOffset, recordsPerFetch) nextResult chan fetchResult pendingResults = list.New() - bufferedResult fetchResult - readyBufferedResults chan fetchResult // this is non-nil when bufferedResult is non-empty + // bufferedResult is the next fetch that should be polled by PollFetches(). + bufferedResult fetchResult + + // readyBufferedResults channel gets continuously flipped between nil and the actual channel + // where PollFetches() reads from. This channel is nil when there are no ordered buffered + // records ready to be written to the channel where PollFetches(), and is non-nil when there + // are some ordered buffered records ready. + // + // It is guaranteed that this channel is non-nil when bufferedResult is non-empty. + readyBufferedResults chan fetchResult ) nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume - // We need to make sure we don't leak any goroutine given that start is called within a goroutine. - defer r.wg.Done() + // When the fetcher is stopped, buffered records are intentionally dropped. For this reason, + // we do reset the counter of buffered records to zero when this goroutine ends. + defer r.bufferedFetchedRecords.Store(0) + for { refillBufferedResult := nextResult if readyBufferedResults != nil { @@ -657,10 +679,24 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu continue } nextFetch = nextFetch.UpdateBytesPerRecord(result.fetchedBytes, len(result.Records)) + + // We have some ordered records ready to be sent to PollFetches(). We store the fetch + // result in bufferedResult, and we flip readyBufferedResults to the channel used by + // PollFetches(). bufferedResult = result readyBufferedResults = r.orderedFetches + // We increase the count of buffered records only for ordered records that we're sure + // will not be discarded later, so that we can get an accurate tracking of the records + // ready to be polled by PollFetches() but not polled yet. + r.bufferedFetchedRecords.Add(int64(len(result.Records))) + case readyBufferedResults <- bufferedResult: + // We've written the records to the channel read by PollFetches(). Since the channel + // is not buffered, as soon as the write succeed it means PollFetches() has read it, + // so we can consider these records polled and decrease the buffered records counter. + r.bufferedFetchedRecords.Sub(int64(len(bufferedResult.Records))) + bufferedResult.finishWaitingForConsumption() readyBufferedResults = nil bufferedResult = fetchResult{} diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index 5eda6a258e1..f96f27d5b47 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/services" + "github.com/grafana/dskit/test" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -303,9 +304,12 @@ func TestConcurrentFetchers(t *testing.T) { assert.Zero(t, fetches.NumRecords()) assert.Error(t, fetchCtx.Err(), "Expected context to be cancelled") + assert.Zero(t, fetchers.BufferedRecords()) }) t.Run("cold replay", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -321,9 +325,14 @@ func TestConcurrentFetchers(t *testing.T) { fetches, _ := fetchers.PollFetches(ctx) assert.Equal(t, fetches.NumRecords(), 5) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("fetch records produced after startup", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -339,6 +348,9 @@ func TestConcurrentFetchers(t *testing.T) { fetches, _ := fetchers.PollFetches(ctx) assert.Equal(t, fetches.NumRecords(), 3) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("slow processing of fetches", func(t *testing.T) { @@ -357,26 +369,42 @@ func TestConcurrentFetchers(t *testing.T) { var wg sync.WaitGroup wg.Add(1) + go func() { defer wg.Done() consumedRecords := 0 for consumedRecords < 10 { fetches, _ := fetchers.PollFetches(ctx) - time.Sleep(1000 * time.Millisecond) // Simulate slow processing consumedRecords += fetches.NumRecords() + + // Simulate slow processing. + time.Sleep(200 * time.Millisecond) } assert.Equal(t, 10, consumedRecords) }() - // Produce more records while processing is slow - for i := 5; i < 10; i++ { - produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) - } + // Slowly produce more records while processing is slow too. This increase the chances + // of progressive fetches done by the consumer. + wg.Add(1) + + go func() { + defer wg.Done() + + for i := 5; i < 10; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + time.Sleep(200 * time.Millisecond) + } + }() wg.Wait() + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("fast processing of fetches", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -390,25 +418,27 @@ func TestConcurrentFetchers(t *testing.T) { produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - consumedRecords := 0 - for consumedRecords < 10 { - fetches, _ := fetchers.PollFetches(ctx) - consumedRecords += fetches.NumRecords() - // no processing delay - } - assert.Equal(t, 10, consumedRecords) - }() + // Consume all expected records. + consumedRecords := 0 + for consumedRecords < 10 { + fetches, _ := fetchers.PollFetches(ctx) + consumedRecords += fetches.NumRecords() + } + assert.Equal(t, 10, consumedRecords) - wg.Wait() + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("fetch with different concurrency levels", func(t *testing.T) { + t.Parallel() + for _, concurrency := range []int{1, 2, 4} { + concurrency := concurrency + t.Run(fmt.Sprintf("concurrency-%d", concurrency), func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -429,11 +459,16 @@ func TestConcurrentFetchers(t *testing.T) { } assert.Equal(t, 20, totalRecords) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) } }) t.Run("start from mid-stream offset", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -472,9 +507,14 @@ func TestConcurrentFetchers(t *testing.T) { "new-record-1", "new-record-2", }, fetchedRecordsContents) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("synchronous produce and fetch", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -507,10 +547,15 @@ func TestConcurrentFetchers(t *testing.T) { // Verify fetched records assert.Equal(t, expectedRecords, fetchedRecords, "Fetched records in round %d do not match expected", round) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) } }) t.Run("concurrency can be updated", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() rec1 := []byte("record-1") @@ -548,10 +593,13 @@ func TestConcurrentFetchers(t *testing.T) { fetchers.Update(ctx, 10, 10) produceRecordAndAssert(rec3) + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("update concurrency with continuous production", func(t *testing.T) { t.Parallel() + const ( testDuration = 10 * time.Second produceInterval = 10 * time.Millisecond @@ -633,6 +681,9 @@ func TestConcurrentFetchers(t *testing.T) { "Record %d has unexpected content: %s", i, string(record.Value)) } + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) + // Log some statistics t.Logf("Total produced: %d, Total fetched: %d", totalProduced, totalFetched) t.Logf("Fetched with initial concurrency: %d", initialFetched) @@ -642,6 +693,7 @@ func TestConcurrentFetchers(t *testing.T) { t.Run("consume from end and update immediately", func(t *testing.T) { t.Parallel() + const ( initialRecords = 100 additionalRecords = 50 @@ -694,6 +746,9 @@ func TestConcurrentFetchers(t *testing.T) { "Record %d has unexpected content: %s", i, string(record.Value)) } + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) + // Log some statistics t.Logf("Total records produced: %d", initialRecords+additionalRecords) t.Logf("Records produced after start: %d", additionalRecords) @@ -756,6 +811,9 @@ func TestConcurrentFetchers(t *testing.T) { } assert.Equal(t, producedRecordsBytes, fetchedRecordsBytes) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("staggered production with one less than multiple of concurrency and records per fetch", func(t *testing.T) { @@ -823,6 +881,9 @@ func TestConcurrentFetchers(t *testing.T) { return nil, errors.New("mocked error"), true }) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("fetchers do not request offset beyond high watermark", func(t *testing.T) { @@ -898,9 +959,14 @@ func TestConcurrentFetchers(t *testing.T) { // Verify the number and content of fetched records assert.Equal(t, producedRecordsBytes, fetchedRecordsBytes, "Should fetch all produced records") + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("starting to run against a broken broker fails creating the fetchers", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -957,6 +1023,37 @@ func TestConcurrentFetchers(t *testing.T) { assert.ErrorContains(t, err, "failed to find topic ID") assert.ErrorIs(t, err, mockErr) }) + + t.Run("should reset the buffered records count when stopping", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + // Produce some records. + for i := 0; i < 10; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + // We are not consuming the records, so we expect the count of buffered records to increase. + // The actual number of buffered records may change due to concurrency, so we just check + // that there are some buffered records. + test.Poll(t, time.Second, true, func() interface{} { + return fetchers.BufferedRecords() > 0 + }) + + // Stop the fetchers. + fetchers.Stop() + + // Even if there were some buffered records we expect the count to be reset to 0 when stopping + // because the Stop() intentionally discard any buffered record. + require.Zero(t, fetchers.BufferedRecords()) + }) } func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency, recordsPerFetch int) *concurrentFetchers { @@ -995,6 +1092,33 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli return f } +// pollFetchesAndAssertNoRecords ensures that PollFetches() returns 0 records and there are +// no buffered records in fetchers. Since some records are discarded in the PollFetches(), +// we may have to call it multiple times to process all buffered records that need to be +// discarded. +func pollFetchesAndAssertNoRecords(t *testing.T, fetchers *concurrentFetchers) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + for { + fetches, returnCtx := fetchers.PollFetches(ctx) + if errors.Is(returnCtx.Err(), context.DeadlineExceeded) { + break + } + + // We always expect that PollFetches() returns zero records. + require.Len(t, fetches.Records(), 0) + + // If there are no buffered records, we're good. We can end the assertion. + if fetchers.BufferedRecords() == 0 { + return + } + } + + // We stopped polling fetches. We have to make sure there are no buffered records. + require.Zero(t, fetchers.BufferedRecords()) +} + type waiterFunc func() func (w waiterFunc) Wait() { w() } diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index ca8ef6149b9..6ca124d2cda 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -7,7 +7,6 @@ import ( "fmt" "math" "strconv" - "sync" "time" "github.com/go-kit/log" @@ -80,10 +79,11 @@ type PartitionReader struct { consumerGroup string concurrentFetchersMinBytesMaxWaitTime time.Duration - // fetchingMtx protects both the client and the fetcher but never at the same time. - fetchingMtx sync.Mutex - client *kgo.Client - fetcher fetcher + // client and fetcher are both start after PartitionReader creation. Fetcher could also be + // replaced during PartitionReader lifetime. To avoid concurrency issues with functions + // getting their pointers (e.g. BufferedRecords()) we do store their pointers in an atomic. + client atomic.Pointer[kgo.Client] + fetcher atomic.Value newConsumer consumerFactory metrics readerMetrics @@ -136,14 +136,13 @@ func (r *PartitionReader) Update(_ context.Context, _, _ int) { } func (r *PartitionReader) BufferedRecords() int64 { - f := r.getFetcher() var fcount, ccount int64 - if f != nil && f != r { + + if f := r.getFetcher(); f != nil && f != r { fcount = f.BufferedRecords() } - c := r.getClient() - if c != nil { + if c := r.client.Load(); c != nil { ccount = c.BufferedFetchRecords() } @@ -177,11 +176,11 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { if err != nil { return errors.Wrap(err, "creating kafka reader client") } - r.setClient(client) + r.client.Store(client) - r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.getClient()), r.partitionID, r.consumerGroup, r.logger, r.reg) + r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client.Load()), r.partitionID, r.consumerGroup, r.logger, r.reg) - offsetsClient := newPartitionOffsetClient(r.getClient(), r.kafkaCfg.Topic, r.reg, r.logger) + offsetsClient := newPartitionOffsetClient(r.client.Load(), r.kafkaCfg.Topic, r.reg, r.logger) // It's ok to have the start offset slightly outdated. // We only need this offset accurate if we fall behind or if we start and the log gets truncated from beneath us. @@ -214,26 +213,26 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { // // To make it happen, we do pause the fetching first and then we configure consumption. The consumption // will be kept paused until the explicit ResumeFetchPartitions() is called. - r.getClient().PauseFetchPartitions(map[string][]int32{ + r.client.Load().PauseFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.client.Load().AddConsumePartitions(map[string]map[int32]kgo.Offset{ r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)}, }) - f, err := newConcurrentFetchers(ctx, r.getClient(), r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) + f, err := newConcurrentFetchers(ctx, r.client.Load(), r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) if err != nil { return errors.Wrap(err, "creating concurrent fetchers during startup") } - r.setFetcher(f) + r.fetcher.Store(f) } else { // When concurrent fetch is disabled we read records directly from the Kafka client, so we want it // to consume the partition. - r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.client.Load().AddConsumePartitions(map[string]map[int32]kgo.Offset{ r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)}, }) - r.setFetcher(r) + r.fetcher.Store(r) } // Enforce the max consumer lag (if enabled). @@ -263,13 +262,11 @@ func (r *PartitionReader) stopDependencies() error { } } - f := r.getFetcher() - if f != nil { + if f := r.getFetcher(); f != nil { f.Stop() } - c := r.getClient() - if c != nil { + if c := r.client.Load(); c != nil { c.Close() } @@ -318,7 +315,7 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { // We need to switch to franz-go for ongoing fetches. // If we've already fetched some records, we should discard them from franz-go and start from the last consumed offset. - r.setFetcher(r) + r.fetcher.Store(r) lastConsumed := r.consumedOffsetWatcher.LastConsumedOffset() if lastConsumed == -1 { @@ -326,7 +323,7 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { // // The franz-go client is initialized to start consuming from the same place as the other fetcher. // We can just use the client, but we have to resume the fetching because it was previously paused. - r.getClient().ResumeFetchPartitions(map[string][]int32{ + r.client.Load().ResumeFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) return @@ -336,13 +333,13 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { // from a clean setup and have the guarantee that we're not going to read any previously buffered record, // we do remove the partition consumption (this clears the buffer), then we resume the fetching and finally // we add the consumption back. - r.getClient().RemoveConsumePartitions(map[string][]int32{ + r.client.Load().RemoveConsumePartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.getClient().ResumeFetchPartitions(map[string][]int32{ + r.client.Load().ResumeFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.getClient().AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.client.Load().AddConsumePartitions(map[string]map[int32]kgo.Offset{ // Resume from the next unconsumed offset. r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(lastConsumed + 1)}, }) @@ -822,35 +819,15 @@ func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bo return err } -func (r *PartitionReader) setClient(c *kgo.Client) { - r.fetchingMtx.Lock() - defer r.fetchingMtx.Unlock() - r.client = c -} - -func (r *PartitionReader) getClient() *kgo.Client { - r.fetchingMtx.Lock() - defer r.fetchingMtx.Unlock() - return r.client -} - -func (r *PartitionReader) setFetcher(f fetcher) { - r.fetchingMtx.Lock() - defer r.fetchingMtx.Unlock() - r.fetcher = f -} - func (r *PartitionReader) getFetcher() fetcher { - r.fetchingMtx.Lock() - defer r.fetchingMtx.Unlock() - return r.fetcher + return r.fetcher.Load().(fetcher) } func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, fetchContext context.Context) { defer func(start time.Time) { r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) }(time.Now()) - return r.getClient().PollFetches(ctx), ctx + return r.client.Load().PollFetches(ctx), ctx } type partitionCommitter struct { From ee84f33b1eccbf9e32807a0e2e5017f75d0d281e Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2024 12:17:26 +0100 Subject: [PATCH 15/19] Fix Signed-off-by: Marco Pracucci --- pkg/storage/ingest/reader.go | 28 ++++++++++++++++++++-------- pkg/storage/ingest/reader_test.go | 12 ++++++++++++ 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 6ca124d2cda..eebf33cbc71 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "strconv" + "sync" "time" "github.com/go-kit/log" @@ -81,9 +82,13 @@ type PartitionReader struct { // client and fetcher are both start after PartitionReader creation. Fetcher could also be // replaced during PartitionReader lifetime. To avoid concurrency issues with functions - // getting their pointers (e.g. BufferedRecords()) we do store their pointers in an atomic. - client atomic.Pointer[kgo.Client] - fetcher atomic.Value + // getting their pointers (e.g. BufferedRecords()) we use atomic / mutex to protect. + client atomic.Pointer[kgo.Client] + + // We use a mutex for fetcher because it's an interface and we have to check against the + // nil case too. + fetcherMx sync.Mutex + fetcher fetcher newConsumer consumerFactory metrics readerMetrics @@ -224,7 +229,7 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { if err != nil { return errors.Wrap(err, "creating concurrent fetchers during startup") } - r.fetcher.Store(f) + r.setFetcher(f) } else { // When concurrent fetch is disabled we read records directly from the Kafka client, so we want it // to consume the partition. @@ -232,7 +237,7 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)}, }) - r.fetcher.Store(r) + r.setFetcher(r) } // Enforce the max consumer lag (if enabled). @@ -296,7 +301,6 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { } if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency > 0 { - // No need to switch the fetcher, just update the records per fetch. r.getFetcher().Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) return @@ -315,7 +319,7 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { // We need to switch to franz-go for ongoing fetches. // If we've already fetched some records, we should discard them from franz-go and start from the last consumed offset. - r.fetcher.Store(r) + r.setFetcher(r) lastConsumed := r.consumedOffsetWatcher.LastConsumedOffset() if lastConsumed == -1 { @@ -819,8 +823,16 @@ func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bo return err } +func (r *PartitionReader) setFetcher(f fetcher) { + r.fetcherMx.Lock() + defer r.fetcherMx.Unlock() + r.fetcher = f +} + func (r *PartitionReader) getFetcher() fetcher { - return r.fetcher.Load().(fetcher) + r.fetcherMx.Lock() + defer r.fetcherMx.Unlock() + return r.fetcher } func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, fetchContext context.Context) { diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 587b47c736f..432d285c8b3 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -1986,6 +1986,18 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testi } } +func TestPartitionReader_ShouldNotPanicIfBufferedRecordsIsCalledBeforeStarting(t *testing.T) { + const ( + topicName = "test" + partitionID = 1 + ) + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + reader := createReader(t, clusterAddr, topicName, partitionID, nil) + + require.Zero(t, reader.BufferedRecords()) +} + func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) { const ( topicName = "test" From e544ac82adb49515687d21cd85882027a1798210 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2024 12:21:30 +0100 Subject: [PATCH 16/19] Fix comment Signed-off-by: Marco Pracucci --- pkg/storage/ingest/fetcher.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 06d62f0a375..d4d3ff5d3a6 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -225,8 +225,7 @@ type concurrentFetchers struct { // orderedFetches is a channel where we write fetches that are ready to be polled by PollFetches(). // Since all records must be polled in order, the fetches written to this channel are after - // ordering and "deduplication" (in case the same record is fetched multiple times from different - // routines). + // ordering. orderedFetches chan fetchResult lastReturnedRecord int64 From a011860ab9bfd03f98db6e1d8d31133f1a3e3f9b Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2024 12:24:00 +0100 Subject: [PATCH 17/19] Fix comment Signed-off-by: Marco Pracucci --- pkg/storage/ingest/fetcher.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index d4d3ff5d3a6..f54d631dc1e 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -685,9 +685,9 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu bufferedResult = result readyBufferedResults = r.orderedFetches - // We increase the count of buffered records only for ordered records that we're sure - // will not be discarded later, so that we can get an accurate tracking of the records - // ready to be polled by PollFetches() but not polled yet. + // We increase the count of buffered records only for ordered records, so that we can + // get an accurate tracking of the records ready to be polled by PollFetches() but not + // polled yet. This mimics the similar tracking done by the franz-go library. r.bufferedFetchedRecords.Add(int64(len(result.Records))) case readyBufferedResults <- bufferedResult: From e894fca23faee9a8ee9d4c39221479033be32f53 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2024 12:52:53 +0100 Subject: [PATCH 18/19] Get back to Josh implementation of buffered records tracking which has better coverage of all buffered records Signed-off-by: Marco Pracucci --- pkg/storage/ingest/fetcher.go | 45 ++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index f54d631dc1e..4ecbc131d27 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -320,6 +320,10 @@ func (r *concurrentFetchers) Stop() { close(r.done) r.wg.Wait() + // When the fetcher is stopped, buffered records are intentionally dropped. For this reason, + // we do reset the counter of buffered records here. + r.bufferedFetchedRecords.Store(0) + level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) } @@ -339,6 +343,13 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont case <-ctx.Done(): return kgo.Fetches{}, ctx case f := <-r.orderedFetches: + // The records have been polled from the buffer, so we can now decrease the number of + // buffered records. It's important to note that we decrease it by the number of actually + // buffered records and not by the number of records returned by PollFetchers(), which + // could be lower if some records are discarded because "old" (already returned by previous + // PollFetches() calls). + r.bufferedFetchedRecords.Sub(int64(len(f.Records))) + firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord) r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime) @@ -523,6 +534,10 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg attemptSpan.SetTag("attempt", attempt) f := r.fetchSingle(ctx, w) + + // We increase the count of buffered records as soon as we fetch them. + r.bufferedFetchedRecords.Add(int64(len(f.Records))) + f = f.Merge(previousResult) previousResult = f if f.Err != nil { @@ -614,8 +629,20 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu defer r.wg.Done() var ( - nextFetch = fetchWantFrom(startOffset, recordsPerFetch) - nextResult chan fetchResult + // nextFetch is the next records fetch operation we want to issue to one of the running workers. + // It contains the offset range to fetch and a channel where the result should be written to. + nextFetch = fetchWantFrom(startOffset, recordsPerFetch) + + // nextResult is the channel where we expect a worker will write the result of the next fetch + // operation. This result is the next result that will be returned to PollFetches(), guaranteeing + // records ordering. + nextResult chan fetchResult + + // pendingResults is the list of all fetchResult of all inflight fetch operations. Pending results + // are ordered in the same order these results should be returned to PollFetches(), so the first one + // in the list is the next one that should be returned, unless nextResult is valued (if nextResult + // is valued, then nextResult is the next and the first item in the pendingResults list is the + // 2nd next one). pendingResults = list.New() // bufferedResult is the next fetch that should be polled by PollFetches(). @@ -631,10 +658,6 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu ) nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume - // When the fetcher is stopped, buffered records are intentionally dropped. For this reason, - // we do reset the counter of buffered records to zero when this goroutine ends. - defer r.bufferedFetchedRecords.Store(0) - for { refillBufferedResult := nextResult if readyBufferedResults != nil { @@ -685,17 +708,7 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu bufferedResult = result readyBufferedResults = r.orderedFetches - // We increase the count of buffered records only for ordered records, so that we can - // get an accurate tracking of the records ready to be polled by PollFetches() but not - // polled yet. This mimics the similar tracking done by the franz-go library. - r.bufferedFetchedRecords.Add(int64(len(result.Records))) - case readyBufferedResults <- bufferedResult: - // We've written the records to the channel read by PollFetches(). Since the channel - // is not buffered, as soon as the write succeed it means PollFetches() has read it, - // so we can consider these records polled and decrease the buffered records counter. - r.bufferedFetchedRecords.Sub(int64(len(bufferedResult.Records))) - bufferedResult.finishWaitingForConsumption() readyBufferedResults = nil bufferedResult = fetchResult{} From c70ffe8c660f3026b6634eba21627dfe685c6fce Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 13 Nov 2024 13:12:15 +0100 Subject: [PATCH 19/19] Use atomic for fetcher too Signed-off-by: Marco Pracucci --- pkg/storage/ingest/reader.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index eebf33cbc71..43c4e664306 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -7,7 +7,6 @@ import ( "fmt" "math" "strconv" - "sync" "time" "github.com/go-kit/log" @@ -82,13 +81,9 @@ type PartitionReader struct { // client and fetcher are both start after PartitionReader creation. Fetcher could also be // replaced during PartitionReader lifetime. To avoid concurrency issues with functions - // getting their pointers (e.g. BufferedRecords()) we use atomic / mutex to protect. - client atomic.Pointer[kgo.Client] - - // We use a mutex for fetcher because it's an interface and we have to check against the - // nil case too. - fetcherMx sync.Mutex - fetcher fetcher + // getting their pointers (e.g. BufferedRecords()) we use atomic to protect. + client atomic.Pointer[kgo.Client] + fetcher atomic.Pointer[fetcher] newConsumer consumerFactory metrics readerMetrics @@ -824,15 +819,16 @@ func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bo } func (r *PartitionReader) setFetcher(f fetcher) { - r.fetcherMx.Lock() - defer r.fetcherMx.Unlock() - r.fetcher = f + r.fetcher.Store(&f) } func (r *PartitionReader) getFetcher() fetcher { - r.fetcherMx.Lock() - defer r.fetcherMx.Unlock() - return r.fetcher + pointer := r.fetcher.Load() + if pointer == nil { + return nil + } + + return *pointer } func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, fetchContext context.Context) {