Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Producer] simplify the flush logic #1049

Merged
merged 3 commits into from
Jul 18, 2023

Conversation

gunli
Copy link
Contributor

@gunli gunli commented Jul 4, 2023

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #1044

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #1044

Motivation

Simplify the producer flush logic

Modifications

  1. add a callback field to the pendingItem, default is nil;
  2. in partitionProducer.internalFlush() get the last pendingItem from pendingQueue;
  3. update the last pendingItem by setup a new callback;
  4. in partitionProducer.ReceivedSendReceipt, no need to identify the sendRequest by checking if the msg is nil;
  5. in pendingItem.Complete(), invoke its callback to notify the flush is done.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (yes)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@gunli
Copy link
Contributor Author

gunli commented Jul 11, 2023

@RobertIndie @shibd Would you please review this PR.

@RobertIndie RobertIndie merged commit 3812c07 into apache:master Jul 18, 2023
RobertIndie pushed a commit that referenced this pull request Sep 7, 2023
### Motivation

Simplify the producer flush logic

### Modifications
1. add a callback field to the pendingItem, default is nil;
2. in partitionProducer.internalFlush() get the last pendingItem from pendingQueue;
3. update the last pendingItem by setup a new callback;
4. in partitionProducer.ReceivedSendReceipt, no need to identify the sendRequest by checking if the msg is nil;
5. in pendingItem.Complete(), invoke its callback to notify the flush is done.

---------

Co-authored-by: gunli <gunli@tencent.com>
(cherry picked from commit 3812c07)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improve][Producer]Simplify the flush logic
2 participants