Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix buffer_drain_concurrency not doing anything. #14545

Closed
wants to merge 1 commit into from

Conversation

arthurschreiber
Copy link
Contributor

@arthurschreiber arthurschreiber commented Nov 18, 2023

Description

As described in #11684, the --buffer_drain_concurrency CLI argument to vtgate does not actually do anything.

This pull request implements the logic required to make this flag actually do something. 😬 When the buffer is drained, we now spawn as many goroutines as specified by --buffer_drain_concurrency to drain the buffer in parallel.

I don't think introducing a new flag as described in #11684 actually makes sense - instead I propose we mention this in the v19 changelog that this flag is now doing something, and don't backport the change to any earlier releases.

Related Issue(s)

#11684

Checklist

  • "Backport to:" labels have been added if this change should be back-ported
  • Tests were added or are not required
  • Did the new or modified tests pass consistently locally and on the CI
  • Documentation was added or is not required

Deployment Notes

Signed-off-by: Arthur Schreiber <arthurschreiber@github.com>
Copy link
Contributor

vitess-bot bot commented Nov 18, 2023

Review Checklist

Hello reviewers! 👋 Please follow this checklist when reviewing this Pull Request.

General

  • Ensure that the Pull Request has a descriptive title.
  • Ensure there is a link to an issue (except for internal cleanup and flaky test fixes), new features should have an RFC that documents use cases and test cases.

Tests

  • Bug fixes should have at least one unit or end-to-end test, enhancement and new features should have a sufficient number of tests.

Documentation

  • Apply the release notes (needs details) label if users need to know about this change.
  • New features should be documented.
  • There should be some code comments as to why things are implemented the way they are.
  • There should be a comment at the top of each new or modified test to explain what the test does.

New flags

  • Is this flag really necessary?
  • Flag names must be clear and intuitive, use dashes (-), and have a clear help text.

If a workflow is added or modified:

  • Each item in Jobs should be named in order to mark it as required.
  • If the workflow needs to be marked as required, the maintainer team must be notified.

Backward compatibility

  • Protobuf changes should be wire-compatible.
  • Changes to _vt tables and RPCs need to be backward compatible.
  • RPC changes should be compatible with vitess-operator
  • If a flag is removed, then it should also be removed from vitess-operator and arewefastyet, if used there.
  • vtctl command output order should be stable and awk-able.

@vitess-bot vitess-bot bot added NeedsDescriptionUpdate The description is not clear or comprehensive enough, and needs work NeedsIssue A linked issue is missing for this Pull Request NeedsWebsiteDocsUpdate What it says labels Nov 18, 2023
@github-actions github-actions bot added this to the v19.0.0 milestone Nov 18, 2023
@arthurschreiber arthurschreiber marked this pull request as ready for review November 18, 2023 13:56
@arthurschreiber arthurschreiber added Component: Query Serving Type: Bug and removed NeedsIssue A linked issue is missing for this Pull Request NeedsDescriptionUpdate The description is not clear or comprehensive enough, and needs work labels Nov 18, 2023
@arthurschreiber arthurschreiber self-assigned this Nov 18, 2023
Comment on lines +574 to +596
entryChan := make(chan *entry, len(q))

parallelism := min(bufferDrainConcurrency, len(q))

var wg sync.WaitGroup
wg.Add(parallelism)

for i := 0; i < parallelism; i++ {
go func() {
for _, e := range q {
sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */)
}

wg.Done()
}()
}

for _, e := range q {
sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */)
entryChan <- e
}

close(entryChan)
wg.Wait()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to only pump the entries through the channel if bufferDrainConcurrency is set to a value higher than 1? Or is the overhead negligible?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think this is an important optimization. We need a legacy codepath for when bufferDrainConcurrency <= 1.

// TODO(mberlin): Parallelize the drain by pumping the data through a channel.

// Parallelize the drain by pumping the data through a channel.
entryChan := make(chan *entry, len(q))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need to make the channel buffered? 🤔 I think it should be fine to not have it buffered, but I'm not 100% sure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be buffered, definitely not buffered as len(q) because that's a lot of underlying memory being allocated!

@arthurschreiber arthurschreiber requested a review from vmg November 20, 2023 11:58
@arthurschreiber
Copy link
Contributor Author

@vmg Do you mind taking a look? 🙇‍♂️

Copy link
Collaborator

@vmg vmg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite an oversight! I think this looks good overall. We should definitely add a serial path for the case where there's no concurrency.

// TODO(mberlin): Parallelize the drain by pumping the data through a channel.

// Parallelize the drain by pumping the data through a channel.
entryChan := make(chan *entry, len(q))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be buffered, definitely not buffered as len(q) because that's a lot of underlying memory being allocated!

Comment on lines +574 to +596
entryChan := make(chan *entry, len(q))

parallelism := min(bufferDrainConcurrency, len(q))

var wg sync.WaitGroup
wg.Add(parallelism)

for i := 0; i < parallelism; i++ {
go func() {
for _, e := range q {
sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */)
}

wg.Done()
}()
}

for _, e := range q {
sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */)
entryChan <- e
}

close(entryChan)
wg.Wait()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think this is an important optimization. We need a legacy codepath for when bufferDrainConcurrency <= 1.

sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */)
}

wg.Done()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer wg.Done() at the start of the goroutine to prevent a deadlock on panic.

for _, e := range q {
sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */)
entryChan <- e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To elaborate: entryChan doesn't need to be buffered because the sending goroutine (i.e. this one) doesn't have anything else to do besides sending the entries through the channel. If it's not blocked on the channel send, it'll block on wg.Wait() a couple lines below. So buffering is not an optimization here.

Copy link
Contributor

This PR is being marked as stale because it has been open for 30 days with no activity. To rectify, you may do any of the following:

  • Push additional commits to the associated branch.
  • Remove the stale label.
  • Add a comment indicating why it is not stale.

If no action is taken within 7 days, this PR will be closed.

@github-actions github-actions bot added the Stale Marks PRs as stale after a period of inactivity, which are then closed after a grace period. label Dec 28, 2023
Copy link
Contributor

github-actions bot commented Jan 4, 2024

This PR was closed because it has been stale for 7 days with no activity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: Query Serving NeedsWebsiteDocsUpdate What it says Stale Marks PRs as stale after a period of inactivity, which are then closed after a grace period. Type: Bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants