-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for fully async acknowledgments in source coordination #3384
Add support for fully async acknowledgments in source coordination #3384
Conversation
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1455b8c
to
38d3f6f
Compare
* @param ackowledgmentTimeout - the amount of time that this partition can be completed by the acknowledgment callback before another instance of Data Prepper | ||
* can pick it up for processing | ||
*/ | ||
void updatePartitionForAckWait(final String partitionKey, final Duration ackowledgmentTimeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use full words in interface names. updatePartitionForAcknowledgementWait
.
validateIsInitialized(); | ||
|
||
final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "complete"); | ||
final SourcePartitionStoreItem itemToUpdate = fromAcknowledgmentsCallback ? getSourcePartitionStoreItem(partitionKey, "complete") : validateAndGetSourcePartitionStoreItem(partitionKey, "complete"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make "complete"
a constant now that you are using it twice.
* @since 2.2 | ||
*/ | ||
void completePartition(final String partitionKey); | ||
void completePartition(final String partitionKey, final Boolean fromAcknowledgmentsCallback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an object we can use here instead? Is there any acknowledgements handle of any sort? This would be ideal so that if data is needed from there in the future, we don't have to make yet another change to the interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's anything that makes sense to pass for acknowledgments other than the boolean. As far as objects in requests, it could make sense to keep the model class the same with
void completePartition(final CompletePartitionRequest)
and to do that with the rest of the SourceCoordinator methods as well. I'm not sure if that's worth a large refactor though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only suggested it if there is an existing model. I'm good keeping this as it is.
validateIsInitialized(); | ||
|
||
final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "close"); | ||
final SourcePartitionStoreItem itemToUpdate = fromAcknowledgmentsCallback ? getSourcePartitionStoreItem(partitionKey, "close") : validateAndGetSourcePartitionStoreItem(partitionKey, "close"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is very similar to what you have above in another method. It would be best to move this into a private method so that future maintainers/developers don't miss the other location when modifying the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment on why validation is not needed if this is called from acknowledgement callback?
Signed-off-by: Taylor Gray <tylgry@amazon.com>
The backport to
To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-2.4 2.4
# Navigate to the new working tree
cd .worktrees/backport-2.4
# Create a new branch
git switch --create backport/backport-3384-to-2.4
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 6546901a8492500adda6d4fcfbc7d7f014734d47
# Push it to GitHub
git push --set-upstream origin backport/backport-3384-to-2.4
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-2.4 Then, create a pull request where the |
…pensearch-project#3384) Signed-off-by: Taylor Gray <tylgry@amazon.com>
…pensearch-project#3384) Signed-off-by: Taylor Gray <tylgry@amazon.com>
Description
This changes adds a new function to the source coordinator interface
This method will give ownership of
acknowledgmentTimeout
amount for the acknowledgment to be received. If the acknowledgment for the partition is not received in time, then another instance of Data Prepper will attempt to process it from the beginning. Calling theupdatePartitionForAckWait
is also necessary to allow the current node to pick up a new partition to process immediately.Also added a paramter
fromAcknowledgmentCallback
toclose/completePartition
. Only the callback functions should set this to true when the partition is closed or completed by the callback.Issues Resolved
Resolves #3381
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.