-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Make RestrictionTrackers.getProgress unblocking #36750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @Abacn, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves a concurrency deadlock scenario within the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
/gemini review |
|
Tested with Dataflow job: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request aims to make RestrictionTrackers.getProgress non-blocking to prevent lease expiration issues. The approach of replacing synchronized blocks with ReentrantLock and using tryLock in getProgress is sound and correctly implemented. My review focuses on a couple of improvements: a typo correction and visibility change for a helper method, and fixing a potential data race in the test code to ensure its reliability. Overall, the changes are good and address the stated problem effectively.
sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
Outdated
Show resolved
Hide resolved
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
| return ((HasProgress) delegate).getProgress(); | ||
| public Progress getProgress() { | ||
| pendingProgress = true; | ||
| if (lock.tryLock()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about using tryLock with timeout to wait for up to X seconds? That seems responsive enough and will help make sure contention doesn't prevent observing the most recent progress
we could also consider logging that we're going to return a stale progress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a 1 min timeout. Dataflow side the lease timeout is 10 min.
| @Override | ||
| public synchronized Progress getProgress() { | ||
| return ((HasProgress) delegate).getProgress(); | ||
| public Progress getProgress() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the caller has ordering (like being on the same thread) that guarantees that this is called after a trySplit or tryClaim succeeds, it may be unexpected that this returns a stale progress and lead to weird failures.
Another approach to fixing this would be to make RestrictionTracker.getProgress() quick. In general restriction trackers are lightweight, the ones in Read.java are odd in that they are abusing the restriction tracker tryClaim return payloads. I think that we could instead change them to not do so beneath RestrictionTracker.tryClaim as another way to fix this.
hacky example, needs more thought on trySplit:
#36758
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it would help Read transform. Here this change is a little bit more generic that apply to all SDFs. We also have other IOs based on SDFs directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the first concern that caller may observe stale progress in cases it itself is calling in order (via being on the same thread or some other happens-before relationship)?
For example, if the split processing thread calls:
- trySplit
- getProgress
then the getProgress response might be stale from before the split. This could possibly confuse the runner. I'm not sure if this is the case or not but I think we'd have to follow the code to make sure this doesn't cause some subtle corruption.
@kennknowles might know
The reason I was suggesting fixing Read itself is that other RestrictionTrackers used by other SDFs don't do expensive work in RestrictionTracker.tryClaim implementation. It is generally just recording the offset etc. Requiring that getProgress is responsive doesn't seem to onerous a requirement for a RestrictionTracker if we document it and then we don't have to worry about these possibly stale responses mixing things up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were cases of inaccurate progress confused runner: prior to #26352 boundedsource as restriction tracker (the scenario of Dataflow runner v2 evaluating boundedsource) always reports zero progress, and the effect was slowness and runner not splitting sources, from the PR description. At least if the stale progress is zero it should be minimum concern (happened before).
If the stale progress is a positive number, this is similar to long running DoFn.process we often observe (in the case of Dataflow, there is a straggler alert) but the pipeline can still work as intended. Let me write a pipeline and test this scenario, would a stale positive prgress make runner eagerly issue trySplit, or other unexpected behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the existing code to check for breakage is good, but we also need a super clear spec here. The Progress object has to be associated with a restriction to have a meaning. So the caller needs to know what restriction it is relative to. How should the caller be sure which restriction the Progress is for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the caller needs to know what restriction it is relative to. How should the caller be sure which restriction the Progress is for?
The caller stacktrace, in SDK harness, is
org.apache.beam.fn.harness.control.ProcessBundleHandler#progress
org.apache.beam.fn.harness.control.ProcessBundleHandler#intermediateMonitoringData
org.apache.beam.fn.harness.control.BundleProgressReporter.InMemory#updateIntermediateMonitoringData
org.apache.beam.fn.harness.FnApiDoFnRunner.<anonymous BundleProgressReporter, under case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN>.updateIntermediateMonitoringData
org.apache.beam.fn.harness.FnApiDoFnRunner#getProgress
it has a member currentTracker that is a restriction tracker:
beam/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
Line 750 in c8d7ca0
| ((HasProgress) currentTracker).getProgress(), windowCurrentIndex, windowStopIndex); |
The caller's restriction tracker is known (assigned in processElementForWindowObservingSizedElementAndRestriction). The restriction tracker's getProgress call simply calls delegate's getProgress:
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
Line 105 in c8d7ca0
| return ((HasProgress) delegate).getProgress(); |
where delegate in this case is a BoundedSourceAsSDFRestrictionTracker
It's getProgress call invokes currentReader.getFractionConsumed(). I understand at this point restriction tracker relies on the underlying reader to know the progress. After this change, when getProgress stucks, it reports the last claimed progress, which I consider it is still a valid value.
Would these information sufficient to answer the concern here?
| @Override | ||
| public synchronized Progress getProgress() { | ||
| return ((HasProgress) delegate).getProgress(); | ||
| public Progress getProgress() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the first concern that caller may observe stale progress in cases it itself is calling in order (via being on the same thread or some other happens-before relationship)?
For example, if the split processing thread calls:
- trySplit
- getProgress
then the getProgress response might be stale from before the split. This could possibly confuse the runner. I'm not sure if this is the case or not but I think we'd have to follow the code to make sure this doesn't cause some subtle corruption.
@kennknowles might know
The reason I was suggesting fixing Read itself is that other RestrictionTrackers used by other SDFs don't do expensive work in RestrictionTracker.tryClaim implementation. It is generally just recording the offset etc. Requiring that getProgress is responsive doesn't seem to onerous a requirement for a RestrictionTracker if we document it and then we don't have to worry about these possibly stale responses mixing things up.
Fix long running tryClaim() blocking concurrent getProgress call that leads to runner lease expire
This fix is to change synchronized {} blocks to a reentrant lock, and in particular, remove synchronized {} for getProgress call. Instead getProgress is exexcuted in best effort
if the lock is not held currently, evaluate getProgress
if the lock is currently held, report the progress last time evaluated, and
internal tracker: b/440435833
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.