Skip to content

Commit

Permalink
Fix ordering of call to fetcher.Stop() and improve unit tests (#9850)
Browse files Browse the repository at this point in the history
* Fix ordering of call to fetcher.Stop() and improve unit tests

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Updated CHANGELOG

Signed-off-by: Marco Pracucci <marco@pracucci.com>

---------

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Nov 7, 2024
1 parent 62b1564 commit f803fbb
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* [ENHANCEMENT] Ruler: Support `group_limit` and `group_next_token` parameters in the `<prometheus-http-prefix>/api/v1/rules` endpoint. #9563
* [ENHANCEMENT] Ingester: improved lock contention affecting read and write latencies during TSDB head compaction. #9822
* [ENHANCEMENT] Distributor: when a label value fails validation due to invalid UTF-8 characters, don't include the invalid characters in the returned error. #9828
* [ENHANCEMENT] Ingester: when experimental ingest storage is enabled, do not buffer records in the Kafka client when fetch concurrency is in use. #9838
* [ENHANCEMENT] Ingester: when experimental ingest storage is enabled, do not buffer records in the Kafka client when fetch concurrency is in use. #9838 #9850
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ func (r *PartitionReader) run(ctx context.Context) error {
return nil
}

// switchToOngoingFetcher switches to the configured ongoing fetcher. This function could be
// called multiple times.
func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) {
if r.kafkaCfg.StartupFetchConcurrency == r.kafkaCfg.OngoingFetchConcurrency && r.kafkaCfg.StartupRecordsPerFetch == r.kafkaCfg.OngoingRecordsPerFetch {
// we're already using the same settings, no need to switch
Expand All @@ -281,16 +283,16 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) {
}

if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency == 0 {
// Stop the current fetcher before replacing it.
r.fetcher.Stop()

if r.fetcher == r {
// This method has been called before, no need to switch the fetcher.
return
}

level.Info(r.logger).Log("msg", "partition reader is switching to non-concurrent fetcher")

// Stop the current fetcher before replacing it.
r.fetcher.Stop()

// We need to switch to franz-go for ongoing fetches.
// If we've already fetched some records, we should discard them from franz-go and start from the last consumed offset.
r.fetcher = r
Expand Down
21 changes: 12 additions & 9 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ func TestPartitionReader_ConsumerError(t *testing.T) {

// We want to run this test with different concurrency config.
concurrencyVariants := map[string][]readerTestCfgOpt{
"without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)},
"with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)},
"with startup and ongoing concurrency": {withStartupConcurrency(2), withOngoingConcurrency(2)},
"without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)},
"with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)},
"with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)},
"with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)},
}

for concurrencyName, concurrencyVariant := range concurrencyVariants {
Expand Down Expand Up @@ -166,9 +167,10 @@ func TestPartitionReader_ConsumerStopping(t *testing.T) {

// We want to run this test with different concurrency config.
concurrencyVariants := map[string][]readerTestCfgOpt{
"without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)},
"with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)},
"with startup and ongoing concurrency": {withStartupConcurrency(2), withOngoingConcurrency(2)},
"without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)},
"with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)},
"with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)},
"with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)},
}

for concurrencyName, concurrencyVariant := range concurrencyVariants {
Expand Down Expand Up @@ -513,9 +515,10 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) {

// We want to run all these tests with different concurrency config.
concurrencyVariants := map[string][]readerTestCfgOpt{
"without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)},
"with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)},
"with startup and ongoing concurrency": {withStartupConcurrency(2), withOngoingConcurrency(2)},
"without concurrency": {withStartupConcurrency(0), withOngoingConcurrency(0)},
"with startup concurrency": {withStartupConcurrency(2), withOngoingConcurrency(0)},
"with startup and ongoing concurrency (same settings)": {withStartupConcurrency(2), withOngoingConcurrency(2)},
"with startup and ongoing concurrency (different settings)": {withStartupConcurrency(2), withOngoingConcurrency(4)},
}

t.Run("should immediately switch to Running state if partition is empty", func(t *testing.T) {
Expand Down

0 comments on commit f803fbb

Please sign in to comment.