From f803fbb52830138ab1b12234cdecde0275037542 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 7 Nov 2024 09:52:23 -0500 Subject: [PATCH] Fix ordering of call to fetcher.Stop() and improve unit tests (#9850) * Fix ordering of call to fetcher.Stop() and improve unit tests Signed-off-by: Marco Pracucci * Updated CHANGELOG Signed-off-by: Marco Pracucci --------- Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 +- pkg/storage/ingest/reader.go | 8 +++++--- pkg/storage/ingest/reader_test.go | 21 ++++++++++++--------- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bb2810ce205..17de36a3bde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,7 +54,7 @@ * [ENHANCEMENT] Ruler: Support `group_limit` and `group_next_token` parameters in the `/api/v1/rules` endpoint. #9563 * [ENHANCEMENT] Ingester: improved lock contention affecting read and write latencies during TSDB head compaction. #9822 * [ENHANCEMENT] Distributor: when a label value fails validation due to invalid UTF-8 characters, don't include the invalid characters in the returned error. #9828 -* [ENHANCEMENT] Ingester: when experimental ingest storage is enabled, do not buffer records in the Kafka client when fetch concurrency is in use. #9838 +* [ENHANCEMENT] Ingester: when experimental ingest storage is enabled, do not buffer records in the Kafka client when fetch concurrency is in use. #9838 #9850 * [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508 * [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508 * [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508 diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index b32aace3412..f7000f5fbe1 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -268,6 +268,8 @@ func (r *PartitionReader) run(ctx context.Context) error { return nil } +// switchToOngoingFetcher switches to the configured ongoing fetcher. This function could be +// called multiple times. func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { if r.kafkaCfg.StartupFetchConcurrency == r.kafkaCfg.OngoingFetchConcurrency && r.kafkaCfg.StartupRecordsPerFetch == r.kafkaCfg.OngoingRecordsPerFetch { // we're already using the same settings, no need to switch @@ -281,9 +283,6 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { } if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency == 0 { - // Stop the current fetcher before replacing it. - r.fetcher.Stop() - if r.fetcher == r { // This method has been called before, no need to switch the fetcher. return @@ -291,6 +290,9 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { level.Info(r.logger).Log("msg", "partition reader is switching to non-concurrent fetcher") + // Stop the current fetcher before replacing it. + r.fetcher.Stop() + // We need to switch to franz-go for ongoing fetches. // If we've already fetched some records, we should discard them from franz-go and start from the last consumed offset. r.fetcher = r diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 32a0cabc150..29aad7b5b5c 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -111,9 +111,10 @@ func TestPartitionReader_ConsumerError(t *testing.T) { // We want to run this test with different concurrency config. concurrencyVariants := map[string][]readerTestCfgOpt{ - "without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)}, - "with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)}, - "with startup and ongoing concurrency": {withStartupConcurrency(2), withOngoingConcurrency(2)}, + "without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)}, + "with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)}, + "with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)}, + "with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)}, } for concurrencyName, concurrencyVariant := range concurrencyVariants { @@ -166,9 +167,10 @@ func TestPartitionReader_ConsumerStopping(t *testing.T) { // We want to run this test with different concurrency config. concurrencyVariants := map[string][]readerTestCfgOpt{ - "without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)}, - "with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)}, - "with startup and ongoing concurrency": {withStartupConcurrency(2), withOngoingConcurrency(2)}, + "without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)}, + "with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)}, + "with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)}, + "with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)}, } for concurrencyName, concurrencyVariant := range concurrencyVariants { @@ -513,9 +515,10 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { // We want to run all these tests with different concurrency config. concurrencyVariants := map[string][]readerTestCfgOpt{ - "without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)}, - "with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)}, - "with startup and ongoing concurrency": {withStartupConcurrency(2), withOngoingConcurrency(2)}, + "without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)}, + "with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)}, + "with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)}, + "with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)}, } t.Run("should immediately switch to Running state if partition is empty", func(t *testing.T) {