From 718a1e7eb13a1aa948a2f1c7f1ebdd44e5e48411 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Tue, 20 Jun 2023 17:47:13 +0800 Subject: [PATCH 1/3] change PartitionsRoutingMode default from UseSinglePartition to RoundRobinDistribution --- lib/PartitionedProducerImpl.cc | 12 ++++++------ lib/ProducerConfigurationImpl.h | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 4178096c..0f05c78b 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -65,17 +65,17 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() { switch (conf_.getPartitionsRoutingMode()) { + case ProducerConfiguration::UseSinglePartition: + return std::make_shared(getNumPartitions(), + conf_.getHashingScheme()); + case ProducerConfiguration::CustomPartition: + return conf_.getMessageRouterPtr(); case ProducerConfiguration::RoundRobinDistribution: + default: return std::make_shared( conf_.getHashingScheme(), conf_.getBatchingEnabled(), conf_.getBatchingMaxMessages(), conf_.getBatchingMaxAllowedSizeInBytes(), std::chrono::milliseconds(conf_.getBatchingMaxPublishDelayMs())); - case ProducerConfiguration::CustomPartition: - return conf_.getMessageRouterPtr(); - case ProducerConfiguration::UseSinglePartition: - default: - return std::make_shared(getNumPartitions(), - conf_.getHashingScheme()); } } diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h index c635c48f..c3240209 100644 --- a/lib/ProducerConfigurationImpl.h +++ b/lib/ProducerConfigurationImpl.h @@ -34,7 +34,7 @@ struct ProducerConfigurationImpl { CompressionType compressionType{CompressionNone}; int maxPendingMessages{1000}; int maxPendingMessagesAcrossPartitions{50000}; - ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition}; + ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::RoundRobinDistribution}; MessageRoutingPolicyPtr messageRouter; ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash}; bool useLazyStartPartitionedProducers{false}; From 6dd859d6dee73de4aa2e4a74288142525dfdb956 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Wed, 21 Jun 2023 11:35:20 +0800 Subject: [PATCH 2/3] modify the configuration test --- tests/ProducerConfigurationTest.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/ProducerConfigurationTest.cc b/tests/ProducerConfigurationTest.cc index df5867c1..a12911da 100644 --- a/tests/ProducerConfigurationTest.cc +++ b/tests/ProducerConfigurationTest.cc @@ -33,7 +33,7 @@ TEST(ProducerConfigurationTest, testDefaultConfig) { ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionNone); ASSERT_EQ(conf.getMaxPendingMessages(), 1000); ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 50000); - ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution); ASSERT_EQ(conf.getMessageRouterPtr(), MessageRoutingPolicyPtr{}); ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::BoostHash); ASSERT_EQ(conf.getBlockIfQueueFull(), false); @@ -88,6 +88,9 @@ TEST(ProducerConfigurationTest, testCustomConfig) { conf.setMaxPendingMessagesAcrossPartitions(100000); ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 100000); + conf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition); + conf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution); From a910e8623f911cbbef9171124690824acff79ae4 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Tue, 20 Aug 2024 10:53:26 +0800 Subject: [PATCH 3/3] modify description in ProducerConfiguration.h --- include/pulsar/ProducerConfiguration.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h index 62a63807..ab6d54a5 100644 --- a/include/pulsar/ProducerConfiguration.h +++ b/include/pulsar/ProducerConfiguration.h @@ -236,7 +236,7 @@ class PULSAR_PUBLIC ProducerConfiguration { /** * Set the message routing modes for partitioned topics. * - * Default: UseSinglePartition + * Default: RoundRobinDistribution * * @param PartitionsRoutingMode partition routing mode. * @return