-
Notifications
You must be signed in to change notification settings - Fork 335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add single partition router #999
Add single partition router #999
Conversation
@crossoverJie Hi, thanks for your contribution. Can you fix the ci lint failed? |
PTAL |
/pulsarbot run-failure-checks |
Please rebase to master to have tests fixed. |
Done |
pulsar/single_partition_router.go
Outdated
} | ||
mutex.Lock() | ||
defer mutex.Unlock() | ||
partition := r.R.Intn(int(numPartitions)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to judge again here if singlePartition
is nil. Set it value only when it is nil.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would using sync.Once
be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Thank you for your contribution! But the default router supports this feature: pulsar-client-go/pulsar/default_router.go Lines 57 to 65 in a119bab
|
But when there are no |
pulsar/single_partition_router.go
Outdated
import "sync" | ||
|
||
var ( | ||
singlePartition *int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DO NOT use the global variable.
Please move these variables to line 28.
pulsar/single_partition_router.go
Outdated
func NewSinglePartitionRouter() func(*ProducerMessage, TopicMetadata) int { | ||
return func(message *ProducerMessage, metadata TopicMetadata) int { | ||
numPartitions := metadata.NumPartitions() | ||
if len(message.OrderingKey) != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice that Java only checks the message.key
, without message.orderingKey
, so we should keep the same implementation.
Co-authored-by: Zixuan Liu <nodeces@gmail.com>
CI run failed, I created another PR to fix this issue. |
@nodece Hi, Can you take a look at it again? |
Motivation
https://pulsar.apache.org/docs/2.11.x/concepts-messaging/#routing-modes
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
Based on the code of the Java client, Implemented the
single-partition-router
mentioned in the document.Modifications
Add single-partition-router
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesNewSinglePartitionRouter()
method.Documentation