Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix MimirIngesterStuckProcessingRecordsFromKafka #9855

Merged
merged 19 commits into from
Nov 13, 2024
Merged

Conversation

gotjosh
Copy link
Contributor

@gotjosh gotjosh commented Nov 8, 2024

What this PR does

The alert MimirIngesterStuckProcessingRecordsFromKafka relied on the metric cortex_ingest_storage_reader_buffered_fetch_records_total provided by the Kafka client to identify wether we had stuck buffers or not.

Now that we've implemented concurrent fetching from Kafka and bypass the client's polling function we needed an equivalent metric when using concurrent fetching. This PR does that; In addition to that - the metric also takes the client's buffered records In case we do use a mixture of non-concurrent fetching and concurrent fetching.

Which issue(s) this PR fixes or relates to

N/A

Checklist

  • Tests updated.
  • Documentation added.
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX].
  • about-versioning.md updated with experimental features.

@@ -226,14 +229,8 @@ type concurrentFetchers struct {

// trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes.
trackCompressedBytes bool
}

// Stop implements fetcher
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a move as is - these methods above the constructor seem to be oddly placed.

@gotjosh gotjosh marked this pull request as ready for review November 8, 2024 19:01
@gotjosh gotjosh requested a review from a team as a code owner November 8, 2024 19:01
Copy link
Collaborator

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You improved an unit test to check that the value is 0 when done, but there's no unit test asserting that this value is actually tracked. Can we add a test running for the 4 concurrency variants that test the value is tracked to the buffered number of records? I think we can implement a mocked "pusher" that doesn't process records (it just hangs) so next records are buffered in the client, then we assert on the metric, and finally we release the pusher to end the test.

pkg/storage/ingest/fetcher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/fetcher.go Outdated Show resolved Hide resolved
pkg/storage/ingest/reader.go Outdated Show resolved Hide resolved
pkg/storage/ingest/reader.go Outdated Show resolved Hide resolved

r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.logger, r.reg)
r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.getClient()), r.partitionID, r.consumerGroup, r.logger, r.reg)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I don't need to rework these arguments as closures here (e.g. func() *kgo.Client return { r.getClient() }) as this only happens within start and from my understanding, start, stop and run are guaranteed to never concurrently - but please let me know if I'm wrong.

@pracucci pracucci self-requested a review November 12, 2024 15:40
Copy link
Collaborator

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! LGTM

@@ -496,6 +510,7 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg
attemptSpan.SetTag("attempt", attempt)

f := r.fetchSingle(ctx, w)
r.bufferedFetchedRecords.Add(int64(len(f.FetchPartition.Records)))
Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if something calls Stop() before these records are pushed to orderedFetches then the atomic counter will always be positive. Should we decrement the counter if we give up on sending these records?

I'm thinking about the cases in this select which end up returning

			select {
			case <-r.done:
				wantSpan.Finish()
				attemptSpan.Finish()
				close(w.result)
				return
			case w.result <- f:
				previousResult = fetchResult{}
			case <-ctx.Done():
			default:
				if w.startOffset >= w.endOffset {
					// We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now.
					f.startWaitingForConsumption()
					select {
					case <-r.done:
						wantSpan.Finish()
						attemptSpan.Finish()
						close(w.result)
						return
					case w.result <- f:
						previousResult = fetchResult{}
					case <-ctx.Done():
					}
				}
			}

at this point you can also try to unify all the places which do cleanup actions (like finishing spans, closing channels, now also decrementing the atomic counter)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if something calls Stop() before these records are pushed to orderedFetches then the atomic counter will always be positive

I thought about it too, but do we really care?

  • If the stop is called at shutdown, we don't care.
  • If the stop is called because we're moving from concurrent fetcher at startup to sequential fetchwe when ongoing, we don't care because the fetcher reference is trashed
  • If the stop is called because we're updating the concurrent fetcher config (Update()) then we care... so what if we simply reset the buffer in Stop() after the r.wg.Wait() to keep it simple?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If the stop is called because we're updating the concurrent fetcher config (Update()) then we care... so what if we simply reset the buffer in Stop() after the r.wg.Wait() to keep it simple?

yeah that's another option 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding some more unit tests and I found an issue. The way we currently increase the number of records is wrong because the same record may be concurrently fetched by multiple routines. I'm working on a fix.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding some more unit tests and I found an issue. The way we currently increase the number of records is wrong because the same record may be concurrently fetched by multiple routines. I'm working on a fix.

Not true. It was my test that was not doing the assertion correctly. I'm getting back to the Josh implementation.

Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why we need the mutex around the fetcher and the client?

@pracucci
Copy link
Collaborator

can you explain why we need the mutex around the fetcher and the client?

Client is set in the starting function but there's no guarantee metrics couldn't be scraped in the meanwhile, so race condition. Fetcher same, and also could be updated later if we change it between start and ongoing, so race condition.

@dimitarvdimitrov
Copy link
Contributor

can you explain why we need the mutex around the fetcher and the client?

Client is set in the starting function but there's no guarantee metrics couldn't be scraped in the meanwhile, so race condition. Fetcher same, and also could be updated later if we change it between start and ongoing, so race condition.

😿 go guarantees atomic pointer swaps, so technically we should be safe. Was the race detector complaining?

@pracucci
Copy link
Collaborator

😿 go guarantees atomic pointer swaps, so technically we should be safe. Was the race detector complaining?

Does it? So why atomic.Pointer exists if it's already atomic? 🤔

@seizethedave
Copy link
Contributor

Whole-word pointer operations are atomic, but their order w/ respect to surrounding code executing is not guaranteed without the atomic stuff. That's what the race detector complains about. Many of the whole-word atomic types boil down to using different assembly instructions that make sure dirty CPU cache entries are invalidated appropriately.

gotjosh and others added 13 commits November 13, 2024 09:52
The alert `MimirIngesterStuckProcessingRecordsFromKafka` relied on the metric `cortex_ingest_storage_reader_buffered_fetch_records_total ` provided by the Kafka client to identify wether we had stuck buffers or not.

Now that we've implemented concurrent fetching from Kafka and bypass the client's polling function we needed an equivalent metric when using concurrent fetching. This PR does that; In addition to that - the metric also takes the client's buffered records In case we do use a mixture of non-concurrent fetching and concurrent fetching.
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: gotjosh <josue.abreu@gmail.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
@pracucci
Copy link
Collaborator

People let's keep it simple and safe, following golang advice. I really don't want to reason whether unsynchronised concurrent access is safe or not across all architectures we support.

From golang memory model, first line:

Programs that modify data being simultaneously accessed by multiple goroutines must serialize such access.

I can use an atomic.pointer tho.

Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can solve this with atomic.Pointer I'd prefer that to a mutex. An atomic forces you to use it in a synchronised way, whereas you have to remember to use the mutex. But I'm aware it might be on the bike-shedding side of the spectrum, so you can merge as-is too

@pracucci pracucci marked this pull request as draft November 13, 2024 09:36
…d atomic instead of a mutex to protect client/fetcher access

Signed-off-by: Marco Pracucci <marco@pracucci.com>
@pracucci pracucci marked this pull request as ready for review November 13, 2024 11:07
@pracucci
Copy link
Collaborator

pracucci commented Nov 13, 2024

@dimitarvdimitrov I reworked this PR, doing the following changes:

  1. Changed how buffered records are tracked. Now they track only the ordered records that are ready to be sent to PollFetches(). The counter is manipulated exclusively in the goroutine run by start(), which I think makes the logic easier to follow.
  2. I've added some comments to explain a bit the logic in start() based on my understanding of how it works. Please double check it.
  3. Improved unit tests.
  4. Used atomic instead of a mutex to protect client and fetcher

Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Comment on lines +613 to +614
// We need to make sure we don't leak any goroutine given that start is called within a goroutine.
defer r.wg.Done()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: moved here just to group it with r.wg.Add().

Signed-off-by: Marco Pracucci <marco@pracucci.com>
Comment on lines +386 to +387
// Slowly produce more records while processing is slow too. This increase the chances
// of progressive fetches done by the consumer.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: debugging this test I realised it wan't really simulating a slow processing of fetches because it was always fetching all 10 records in a single PollFetches() call.

Comment on lines -393 to -396
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: removed the PollFetches() call from the goroutine because it was superfluous here.

…s better coverage of all buffered records

Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

@pracucci pracucci merged commit 72cbd83 into main Nov 13, 2024
31 checks passed
@pracucci pracucci deleted the fix-stuck-buffers branch November 13, 2024 13:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants