diff --git a/pulsar/internal/backoff.go b/pulsar/backoff/backoff.go similarity index 77% rename from pulsar/internal/backoff.go rename to pulsar/backoff/backoff.go index 3284fb7e33..453da57865 100644 --- a/pulsar/internal/backoff.go +++ b/pulsar/backoff/backoff.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package internal +package backoff import ( "math/rand" @@ -26,10 +26,17 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// BackoffPolicy parameterize the following options in the reconnection logic to +// Policy parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) -type BackoffPolicy interface { +type Policy interface { + // Next returns the delay to wait before next retry Next() time.Duration + + // IsMaxBackoffReached evaluates if the max number of retries is reached + IsMaxBackoffReached() bool + + // Reset the backoff to the initial state + Reset() } // DefaultBackoff computes the delay before retrying an action. @@ -38,6 +45,13 @@ type DefaultBackoff struct { backoff time.Duration } +func NewDefaultBackoff() Policy { + return &DefaultBackoff{} +} +func NewDefaultBackoffWithInitialBackOff(backoff time.Duration) Policy { + return &DefaultBackoff{backoff: backoff / 2} +} + const maxBackoff = 60 * time.Second // Next returns the delay to wait before next retry @@ -61,3 +75,7 @@ func (b *DefaultBackoff) Next() time.Duration { func (b *DefaultBackoff) IsMaxBackoffReached() bool { return b.backoff >= maxBackoff } + +func (b *DefaultBackoff) Reset() { + b.backoff = 0 +} diff --git a/pulsar/internal/backoff_test.go b/pulsar/backoff/backoff_test.go similarity index 96% rename from pulsar/internal/backoff_test.go rename to pulsar/backoff/backoff_test.go index e05ea29276..fc0a49232b 100644 --- a/pulsar/internal/backoff_test.go +++ b/pulsar/backoff/backoff_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package internal +package backoff import ( "testing" @@ -58,4 +58,6 @@ func TestBackoff_NextMaxValue(t *testing.T) { assert.Equal(t, true, backoff.IsMaxBackoffReached()) // max value is 60 seconds + 20% jitter = 72 seconds assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second)) + backoff.Reset() + assert.Equal(t, false, backoff.IsMaxBackoffReached()) } diff --git a/pulsar/blue_green_migration_test.go b/pulsar/blue_green_migration_test.go index 91667e8d80..672ef343f8 100644 --- a/pulsar/blue_green_migration_test.go +++ b/pulsar/blue_green_migration_test.go @@ -54,8 +54,8 @@ func (suite *BlueGreenMigrationTestSuite) TestTopicMigration() { for _, scenario := range []topicUnloadTestCase{ { - testCaseName: "proxyConnection", - blueAdminURL: "http://localhost:8080", + testCaseName: "proxyConnection", + blueAdminURL: "http://localhost:8080", blueClientUrl: "pulsar://localhost:6650", greenAdminURL: "http://localhost:8081", migrationBody: ` @@ -83,17 +83,17 @@ func testTopicMigrate( migrationBody string) { runtime.GOMAXPROCS(1) const ( - cluster = "cluster-a" + cluster = "cluster-a" tenant = utils.PUBLICTENANT namespace = utils.DEFAULTNAMESPACE - blueBroker1URL = "pulsar://broker-1:6650" - blueBroker2URL = "pulsar://broker-2:6650" + blueBroker1URL = "pulsar://broker-1:6650" + blueBroker2URL = "pulsar://broker-2:6650" greenBroker1URL = "pulsar://green-broker-1:6650" greenBroker2URL = "pulsar://green-broker-2:6650" - blueBroker1LookupURL = "broker-1:8080" - blueBroker2LookupURL = "broker-2:8080" + blueBroker1LookupURL = "broker-1:8080" + blueBroker2LookupURL = "broker-2:8080" greenBroker1LookupURL = "green-broker-1:8080" greenBroker2LookupURL = "green-broker-2:8080" ) @@ -234,7 +234,6 @@ func testTopicMigrate( req.NoError(err) req.NotEmpty(bundleRange) - unloadURL := fmt.Sprintf( "/admin/v2/namespaces/%s/%s/%s/unload?destinationBroker=%s", tenant, namespace, bundleRange, dstTopicBrokerLookupURL) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index bf2eafbf71..880cad563e 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -21,7 +21,7 @@ import ( "context" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" ) // ConsumerMessage represents a pair of a Consumer and Message. @@ -207,9 +207,9 @@ type ConsumerOptions struct { // MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint - // BackoffPolicy parameterize the following options in the reconnection logic to + // BackOffPolicyFunc parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) - BackoffPolicy internal.BackoffPolicy + BackOffPolicyFunc func() backoff.Policy // Decryption represents the encryption related fields required by the consumer to decrypt a message. Decryption *MessageDecryptionInfo diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 679054a044..a3d3e3ff80 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -174,11 +174,12 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } } - dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, client.log) + dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, + options.BackOffPolicyFunc, client.log) if err != nil { return nil, err } - rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, client.log) + rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, options.BackOffPolicyFunc, client.log) if err != nil { return nil, err } @@ -453,7 +454,7 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu readCompacted: options.ReadCompacted, interceptors: options.Interceptors, maxReconnectToBroker: options.MaxReconnectToBroker, - backoffPolicy: options.BackoffPolicy, + backOffPolicyFunc: options.BackOffPolicyFunc, keySharedPolicy: options.KeySharedPolicy, schema: options.Schema, decryption: options.Decryption, diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index f307972c3c..16aef5d45b 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -26,6 +26,8 @@ import ( "sync" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "google.golang.org/protobuf/proto" "github.com/apache/pulsar-client-go/pulsar/crypto" @@ -110,7 +112,7 @@ type partitionConsumerOpts struct { disableForceTopicCreation bool interceptors ConsumerInterceptors maxReconnectToBroker *uint - backoffPolicy internal.BackoffPolicy + backOffPolicyFunc func() backoff.Policy keySharedPolicy *KeySharedPolicy schema Schema decryption *MessageDecryptionInfo @@ -182,6 +184,7 @@ type partitionConsumer struct { lastMessageInBroker *trackingMessageID redirectedClusterURI string + backoffPolicyFunc func() backoff.Policy } func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) { @@ -318,6 +321,13 @@ func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) { func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts, messageCh chan ConsumerMessage, dlq *dlqRouter, metrics *internal.LeveledMetrics) (*partitionConsumer, error) { + var boFunc func() backoff.Policy + if options.backOffPolicyFunc != nil { + boFunc = options.backOffPolicyFunc + } else { + boFunc = backoff.NewDefaultBackoff + } + pc := &partitionConsumer{ parentConsumer: parent, client: client, @@ -339,6 +349,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon dlq: dlq, metrics: metrics, schemaInfoCache: newSchemaInfoCache(client, options.topic), + backoffPolicyFunc: boFunc, } if pc.options.autoReceiverQueueSize { pc.currentQueueSize.Store(initialReceiverQueueSize) @@ -581,12 +592,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { return nil, errors.New("failed to getLastMessageID for the closing or closed consumer") } remainTime := pc.client.operationTimeout - var backoff internal.BackoffPolicy - if pc.options.backoffPolicy != nil { - backoff = pc.options.backoffPolicy - } else { - backoff = &internal.DefaultBackoff{} - } + bo := pc.backoffPolicyFunc() request := func() (*trackingMessageID, error) { req := &getLastMsgIDRequest{doneCh: make(chan struct{})} pc.eventsCh <- req @@ -604,7 +610,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { pc.log.WithError(err).Error("Failed to getLastMessageID") return nil, fmt.Errorf("failed to getLastMessageID due to %w", err) } - nextDelay := backoff.Next() + nextDelay := bo.Next() if nextDelay > remainTime { nextDelay = remainTime } @@ -1684,18 +1690,17 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { } func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) { - var maxRetry int + var ( + maxRetry int + delayReconnectTime, totalDelayReconnectTime time.Duration + ) if pc.options.maxReconnectToBroker == nil { maxRetry = -1 } else { maxRetry = int(*pc.options.maxReconnectToBroker) } - - var ( - delayReconnectTime time.Duration - defaultBackoff = internal.DefaultBackoff{} - ) + bo := pc.backoffPolicyFunc() for maxRetry != 0 { if pc.getConsumerState() != consumerReady { @@ -1710,11 +1715,10 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose delayReconnectTime = 0 assignedBrokerURL = connectionClosed.assignedBrokerURL connectionClosed = nil // Attempt connecting to the assigned broker just once - } else if pc.options.backoffPolicy == nil { - delayReconnectTime = defaultBackoff.Next() } else { - delayReconnectTime = pc.options.backoffPolicy.Next() + delayReconnectTime = bo.Next() } + totalDelayReconnectTime += delayReconnectTime pc.log.WithFields(log.Fields{ "assignedBrokerURL": assignedBrokerURL, @@ -1733,6 +1737,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose if err == nil { // Successfully reconnected pc.log.Info("Reconnected consumer to broker") + bo.Reset() return } pc.log.WithError(err).Error("Failed to create consumer at reconnect") @@ -1747,7 +1752,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose maxRetry-- } pc.metrics.ConsumersReconnectFailure.Inc() - if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() { + if maxRetry == 0 || bo.IsMaxBackoffReached() { pc.metrics.ConsumersReconnectMaxRetry.Inc() } } diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 28ba4f72b0..e1e2ca29e0 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -159,8 +159,9 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string Name: "regex-consumer", } - dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) - rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", + nil, log.DefaultNopLogger()) + rlq, _ := newRetryRouter(c.(*client), nil, false, nil, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { t.Fatal(err) @@ -198,8 +199,9 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string Name: "regex-consumer", } - dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) - rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", + nil, log.DefaultNopLogger()) + rlq, _ := newRetryRouter(c.(*client), nil, false, nil, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { t.Fatal(err) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 04439cbcfe..a4a8d99558 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" @@ -3874,12 +3876,14 @@ func TestConsumerWithBackoffPolicy(t *testing.T) { topicName := newTopicName() - backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + bo := newTestBackoffPolicy(1*time.Second, 4*time.Second) _consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, SubscriptionName: "sub-1", Type: Shared, - BackoffPolicy: backoff, + BackOffPolicyFunc: func() backoff.Policy { + return bo + }, }) assert.Nil(t, err) defer _consumer.Close() @@ -3888,22 +3892,22 @@ func TestConsumerWithBackoffPolicy(t *testing.T) { // 1 s startTime := time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 2 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) } func TestAckWithMessageID(t *testing.T) { diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 647c022d78..6b13b3298b 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -22,31 +22,40 @@ import ( "fmt" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/log" ) type dlqRouter struct { - client Client - producer Producer - policy *DLQPolicy - messageCh chan ConsumerMessage - closeCh chan interface{} - topicName string - subscriptionName string - consumerName string - log log.Logger + client Client + producer Producer + policy *DLQPolicy + messageCh chan ConsumerMessage + closeCh chan interface{} + topicName string + subscriptionName string + consumerName string + backOffPolicyFunc func() backoff.Policy + log log.Logger } func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName, consumerName string, - logger log.Logger) (*dlqRouter, error) { + backOffPolicyFunc func() backoff.Policy, logger log.Logger) (*dlqRouter, error) { + var boFunc func() backoff.Policy + if backOffPolicyFunc != nil { + boFunc = backOffPolicyFunc + } else { + boFunc = backoff.NewDefaultBackoff + } r := &dlqRouter{ - client: client, - policy: policy, - topicName: topicName, - subscriptionName: subscriptionName, - consumerName: consumerName, - log: logger, + client: client, + policy: policy, + topicName: topicName, + subscriptionName: subscriptionName, + consumerName: consumerName, + backOffPolicyFunc: boFunc, + log: logger, } if policy != nil { @@ -155,7 +164,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { } // Retry to create producer indefinitely - backoff := &internal.DefaultBackoff{} + bo := r.backOffPolicyFunc() for { opt := r.policy.ProducerOptions opt.Topic = r.policy.DeadLetterTopic @@ -174,7 +183,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { if err != nil { r.log.WithError(err).Error("Failed to create DLQ producer") - time.Sleep(backoff.Next()) + time.Sleep(bo.Next()) continue } else { r.producer = producer diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go index e68bd17c39..eea0101a1a 100644 --- a/pulsar/internal/http_client.go +++ b/pulsar/internal/http_client.go @@ -29,6 +29,8 @@ import ( "path" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/apache/pulsar-client-go/pulsar/log" @@ -148,12 +150,12 @@ func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]str if _, ok := err.(*url.Error); ok { // We can retry this kind of requests over a connection error because they're // not specific to a particular broker. - backoff := DefaultBackoff{100 * time.Millisecond} + bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond) startTime := time.Now() var retryTime time.Duration for time.Since(startTime) < c.requestTimeout { - retryTime = backoff.Next() + retryTime = bo.Next() c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout) time.Sleep(retryTime) _, err = c.GetWithQueryParams(endpoint, obj, params, true) diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index d2e3895ec5..0f99311655 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -25,6 +25,8 @@ import ( "sync/atomic" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/apache/pulsar-client-go/pulsar/log" @@ -115,7 +117,7 @@ func (c *rpcClient) requestToHost(serviceNameResolver *ServiceNameResolver, var host *url.URL var rpcResult *RPCResult startTime := time.Now() - backoff := DefaultBackoff{100 * time.Millisecond} + bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond) // we can retry these requests because this kind of request is // not specific to any particular broker for time.Since(startTime) < c.requestTimeout { @@ -130,7 +132,7 @@ func (c *rpcClient) requestToHost(serviceNameResolver *ServiceNameResolver, break } - retryTime := backoff.Next() + retryTime := bo.Next() c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout) time.Sleep(retryTime) } diff --git a/pulsar/producer.go b/pulsar/producer.go index 0ae51bd426..997f9c0d02 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -21,7 +21,7 @@ import ( "context" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" ) type HashingScheme int @@ -171,9 +171,9 @@ type ProducerOptions struct { // MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint - // BackoffPolicy parameterize the following options in the reconnection logic to + // BackOffPolicyFunc parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) - BackoffPolicy internal.BackoffPolicy + BackOffPolicyFunc func() backoff.Policy // BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder) // This will be used to create batch container when batching is enabled. diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index af5fb38a75..f578ee4b96 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/internal/compression" internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto" @@ -121,6 +123,7 @@ type partitionProducer struct { redirectedClusterURI string ctx context.Context cancelFunc context.CancelFunc + backOffPolicyFunc func() backoff.Policy } type schemaCache struct { @@ -147,6 +150,14 @@ func (s *schemaCache) Get(schema *SchemaInfo) (schemaVersion []byte) { func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int, metrics *internal.LeveledMetrics) ( *partitionProducer, error) { + + var boFunc func() backoff.Policy + if options.BackOffPolicyFunc != nil { + boFunc = options.BackOffPolicyFunc + } else { + boFunc = backoff.NewDefaultBackoff + } + var batchingMaxPublishDelay time.Duration if options.BatchingMaxPublishDelay != 0 { batchingMaxPublishDelay = options.BatchingMaxPublishDelay @@ -176,15 +187,16 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType), compression.Level(options.CompressionLevel)), - publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), - pendingQueue: internal.NewBlockingQueue(maxPendingMessages), - lastSequenceID: -1, - partitionIdx: int32(partitionIdx), - metrics: metrics, - epoch: 0, - schemaCache: newSchemaCache(), - ctx: ctx, - cancelFunc: cancelFunc, + publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), + pendingQueue: internal.NewBlockingQueue(maxPendingMessages), + lastSequenceID: -1, + partitionIdx: int32(partitionIdx), + metrics: metrics, + epoch: 0, + schemaCache: newSchemaCache(), + ctx: ctx, + cancelFunc: cancelFunc, + backOffPolicyFunc: boFunc, } if p.options.DisableBatching { p.batchFlushTicker.Stop() @@ -458,17 +470,17 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVer } func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed) { - var maxRetry int + var ( + maxRetry int + delayReconnectTime time.Duration + ) if p.options.MaxReconnectToBroker == nil { maxRetry = -1 } else { maxRetry = int(*p.options.MaxReconnectToBroker) } - var ( - delayReconnectTime time.Duration - defaultBackoff = internal.DefaultBackoff{} - ) + bo := p.backOffPolicyFunc() for maxRetry != 0 { select { @@ -489,10 +501,8 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed delayReconnectTime = 0 assignedBrokerURL = connectionClosed.assignedBrokerURL connectionClosed = nil // Only attempt once - } else if p.options.BackoffPolicy == nil { - delayReconnectTime = defaultBackoff.Next() } else { - delayReconnectTime = p.options.BackoffPolicy.Next() + delayReconnectTime = bo.Next() } p.log.WithFields(log.Fields{ @@ -513,6 +523,7 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed if err == nil { // Successfully reconnected p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker") + bo.Reset() return } p.log.WithError(err).Error("Failed to create producer at reconnect") @@ -546,7 +557,7 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed maxRetry-- } p.metrics.ProducersReconnectFailure.Inc() - if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() { + if maxRetry == 0 || bo.IsMaxBackoffReached() { p.metrics.ProducersReconnectMaxRetry.Inc() } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 5b23182d5d..afd1f09af7 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -34,6 +34,8 @@ import ( "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" @@ -1293,11 +1295,13 @@ func TestProducerWithBackoffPolicy(t *testing.T) { topicName := newTopicName() - backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + bo := newTestBackoffPolicy(1*time.Second, 4*time.Second) _producer, err := client.CreateProducer(ProducerOptions{ - Topic: topicName, - SendTimeout: 2 * time.Second, - BackoffPolicy: backoff, + Topic: topicName, + SendTimeout: 2 * time.Second, + BackOffPolicyFunc: func() backoff.Policy { + return bo + }, }) assert.Nil(t, err) defer _producer.Close() @@ -1306,22 +1310,22 @@ func TestProducerWithBackoffPolicy(t *testing.T) { // 1 s startTime := time.Now() partitionProducerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 2 s startTime = time.Now() partitionProducerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionProducerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionProducerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) } func TestSendContextExpired(t *testing.T) { diff --git a/pulsar/reader.go b/pulsar/reader.go index 4daa889062..98bde4e395 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -21,7 +21,7 @@ import ( "context" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" ) // ReaderMessage packages Reader and Message as a struct to use. @@ -89,9 +89,9 @@ type ReaderOptions struct { // Schema represents the schema implementation. Schema Schema - // BackoffPolicy parameterize the following options in the reconnection logic to + // BackoffPolicyFunc parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) - BackoffPolicy internal.BackoffPolicy + BackoffPolicyFunc func() backoff.Policy // MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100) MaxPendingChunkedMessage int diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 7cbae05c6d..f76255e2e8 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -112,7 +112,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { ReplicateSubscriptionState: false, Decryption: options.Decryption, Schema: options.Schema, - BackoffPolicy: options.BackoffPolicy, + BackOffPolicyFunc: options.BackoffPolicyFunc, MaxPendingChunkedMessage: options.MaxPendingChunkedMessage, ExpireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, AutoAckIncompleteChunk: options.AutoAckIncompleteChunk, @@ -128,12 +128,13 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } // Provide dummy dlq router with not dlq policy - dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name, client.log) + dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name, + options.BackoffPolicyFunc, client.log) if err != nil { return nil, err } // Provide dummy rlq router with not dlq policy - rlq, err := newRetryRouter(client, nil, false, client.log) + rlq, err := newRetryRouter(client, nil, false, options.BackoffPolicyFunc, client.log) if err != nil { return nil, err } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index d00346fc7c..3c928c1db7 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" @@ -847,6 +849,13 @@ func (b *testBackoffPolicy) Next() time.Duration { return b.curBackoff } +func (b *testBackoffPolicy) IsMaxBackoffReached() bool { + return false +} + +func (b *testBackoffPolicy) Reset() { + +} func (b *testBackoffPolicy) IsExpectedIntervalFrom(startTime time.Time) bool { // Approximately equal to expected interval @@ -866,11 +875,13 @@ func TestReaderWithBackoffPolicy(t *testing.T) { assert.Nil(t, err) defer client.Close() - backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + bo := newTestBackoffPolicy(1*time.Second, 4*time.Second) _reader, err := client.CreateReader(ReaderOptions{ Topic: "my-topic", StartMessageID: LatestMessageID(), - BackoffPolicy: backoff, + BackoffPolicyFunc: func() backoff.Policy { + return bo + }, }) assert.NotNil(t, _reader) assert.Nil(t, err) @@ -879,22 +890,22 @@ func TestReaderWithBackoffPolicy(t *testing.T) { // 1 s startTime := time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 2 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) // 4 s startTime = time.Now() partitionConsumerImp.reconnectToBroker(nil) - assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + assert.True(t, bo.IsExpectedIntervalFrom(startTime)) } func TestReaderGetLastMessageID(t *testing.T) { diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go index 75792adc14..c8aa0b9464 100644 --- a/pulsar/retry_router.go +++ b/pulsar/retry_router.go @@ -21,7 +21,8 @@ import ( "context" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/log" ) @@ -44,19 +45,28 @@ type RetryMessage struct { } type retryRouter struct { - client Client - producer Producer - policy *DLQPolicy - messageCh chan RetryMessage - closeCh chan interface{} - log log.Logger + client Client + producer Producer + policy *DLQPolicy + messageCh chan RetryMessage + closeCh chan interface{} + backOffPolicyFunc func() backoff.Policy + log log.Logger } -func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool, logger log.Logger) (*retryRouter, error) { +func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool, backOffPolicyFunc func() backoff.Policy, + logger log.Logger) (*retryRouter, error) { + var boFunc func() backoff.Policy + if backOffPolicyFunc != nil { + boFunc = backOffPolicyFunc + } else { + boFunc = backoff.NewDefaultBackoff + } r := &retryRouter{ - client: client, - policy: policy, - log: logger, + client: client, + policy: policy, + backOffPolicyFunc: boFunc, + log: logger, } if policy != nil && retryEnabled { @@ -124,7 +134,7 @@ func (r *retryRouter) getProducer() Producer { } // Retry to create producer indefinitely - backoff := &internal.DefaultBackoff{} + bo := r.backOffPolicyFunc() for { opt := r.policy.ProducerOptions opt.Topic = r.policy.RetryLetterTopic @@ -138,7 +148,7 @@ func (r *retryRouter) getProducer() Producer { if err != nil { r.log.WithError(err).Error("Failed to create RLQ producer") - time.Sleep(backoff.Next()) + time.Sleep(bo.Next()) continue } else { r.producer = producer diff --git a/pulsar/transaction_coordinator_client.go b/pulsar/transaction_coordinator_client.go index 1449d698e5..afde54278b 100644 --- a/pulsar/transaction_coordinator_client.go +++ b/pulsar/transaction_coordinator_client.go @@ -24,6 +24,8 @@ import ( "sync/atomic" "time" + "github.com/apache/pulsar-client-go/pulsar/backoff" + "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" @@ -143,7 +145,7 @@ func (t *transactionHandler) runEventsLoop() { func (t *transactionHandler) reconnectToBroker() { var delayReconnectTime time.Duration - var defaultBackoff = internal.DefaultBackoff{} + var defaultBackoff = backoff.DefaultBackoff{} for { if t.getState() == txnHandlerClosed {