Skip to content

Commit

Permalink
[fix] Reconnection logic and Backoff policy doesn't work correctly (#…
Browse files Browse the repository at this point in the history
…1197)

Fixes #1187

### Modifications

- Move `backoff.go` to the `backoff` directory (because there are circular dependencies, they are not moved to the pulsar directory.)
- Create a new method for `BackOffPolicy` interface `IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool`

This is a **breaking change** that modifies the package name and interface name.

Package: `internal`->`backoff`
Interface name: `BackoffPolicy`-> `Policy`


---------

Co-authored-by: Zixuan Liu <nodeces@gmail.com>
Co-authored-by: Zike Yang <zike@apache.org>
  • Loading branch information
3 people committed Sep 23, 2024
1 parent 188dba9 commit 630d5f8
Show file tree
Hide file tree
Showing 19 changed files with 207 additions and 124 deletions.
24 changes: 21 additions & 3 deletions pulsar/internal/backoff.go → pulsar/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package internal
package backoff

import (
"math/rand"
Expand All @@ -26,10 +26,17 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

// BackoffPolicy parameterize the following options in the reconnection logic to
// Policy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
type BackoffPolicy interface {
type Policy interface {
// Next returns the delay to wait before next retry
Next() time.Duration

// IsMaxBackoffReached evaluates if the max number of retries is reached
IsMaxBackoffReached() bool

// Reset the backoff to the initial state
Reset()
}

// DefaultBackoff computes the delay before retrying an action.
Expand All @@ -38,6 +45,13 @@ type DefaultBackoff struct {
backoff time.Duration
}

func NewDefaultBackoff() Policy {
return &DefaultBackoff{}
}
func NewDefaultBackoffWithInitialBackOff(backoff time.Duration) Policy {
return &DefaultBackoff{backoff: backoff / 2}
}

const maxBackoff = 60 * time.Second

// Next returns the delay to wait before next retry
Expand All @@ -61,3 +75,7 @@ func (b *DefaultBackoff) Next() time.Duration {
func (b *DefaultBackoff) IsMaxBackoffReached() bool {
return b.backoff >= maxBackoff
}

func (b *DefaultBackoff) Reset() {
b.backoff = 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package internal
package backoff

import (
"testing"
Expand Down Expand Up @@ -58,4 +58,6 @@ func TestBackoff_NextMaxValue(t *testing.T) {
assert.Equal(t, true, backoff.IsMaxBackoffReached())
// max value is 60 seconds + 20% jitter = 72 seconds
assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second))
backoff.Reset()
assert.Equal(t, false, backoff.IsMaxBackoffReached())
}
15 changes: 7 additions & 8 deletions pulsar/blue_green_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (suite *BlueGreenMigrationTestSuite) TestTopicMigration() {
for _, scenario := range []topicUnloadTestCase{

{
testCaseName: "proxyConnection",
blueAdminURL: "http://localhost:8080",
testCaseName: "proxyConnection",
blueAdminURL: "http://localhost:8080",
blueClientUrl: "pulsar://localhost:6650",
greenAdminURL: "http://localhost:8081",
migrationBody: `
Expand Down Expand Up @@ -83,17 +83,17 @@ func testTopicMigrate(
migrationBody string) {
runtime.GOMAXPROCS(1)
const (
cluster = "cluster-a"
cluster = "cluster-a"
tenant = utils.PUBLICTENANT
namespace = utils.DEFAULTNAMESPACE

blueBroker1URL = "pulsar://broker-1:6650"
blueBroker2URL = "pulsar://broker-2:6650"
blueBroker1URL = "pulsar://broker-1:6650"
blueBroker2URL = "pulsar://broker-2:6650"
greenBroker1URL = "pulsar://green-broker-1:6650"
greenBroker2URL = "pulsar://green-broker-2:6650"

blueBroker1LookupURL = "broker-1:8080"
blueBroker2LookupURL = "broker-2:8080"
blueBroker1LookupURL = "broker-1:8080"
blueBroker2LookupURL = "broker-2:8080"
greenBroker1LookupURL = "green-broker-1:8080"
greenBroker2LookupURL = "green-broker-2:8080"
)
Expand Down Expand Up @@ -234,7 +234,6 @@ func testTopicMigrate(
req.NoError(err)
req.NotEmpty(bundleRange)


unloadURL := fmt.Sprintf(
"/admin/v2/namespaces/%s/%s/%s/unload?destinationBroker=%s",
tenant, namespace, bundleRange, dstTopicBrokerLookupURL)
Expand Down
6 changes: 3 additions & 3 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"
)

// ConsumerMessage represents a pair of a Consumer and Message.
Expand Down Expand Up @@ -207,9 +207,9 @@ type ConsumerOptions struct {
// MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint

// BackoffPolicy parameterize the following options in the reconnection logic to
// BackOffPolicyFunc parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
BackOffPolicyFunc func() backoff.Policy

// Decryption represents the encryption related fields required by the consumer to decrypt a message.
Decryption *MessageDecryptionInfo
Expand Down
7 changes: 4 additions & 3 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,12 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}
}

dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, client.log)
dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name,
options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
}
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, client.log)
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -453,7 +454,7 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu
readCompacted: options.ReadCompacted,
interceptors: options.Interceptors,
maxReconnectToBroker: options.MaxReconnectToBroker,
backoffPolicy: options.BackoffPolicy,
backOffPolicyFunc: options.BackOffPolicyFunc,
keySharedPolicy: options.KeySharedPolicy,
schema: options.Schema,
decryption: options.Decryption,
Expand Down
41 changes: 23 additions & 18 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"google.golang.org/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/crypto"
Expand Down Expand Up @@ -110,7 +112,7 @@ type partitionConsumerOpts struct {
disableForceTopicCreation bool
interceptors ConsumerInterceptors
maxReconnectToBroker *uint
backoffPolicy internal.BackoffPolicy
backOffPolicyFunc func() backoff.Policy
keySharedPolicy *KeySharedPolicy
schema Schema
decryption *MessageDecryptionInfo
Expand Down Expand Up @@ -182,6 +184,7 @@ type partitionConsumer struct {
lastMessageInBroker *trackingMessageID

redirectedClusterURI string
backoffPolicyFunc func() backoff.Policy
}

func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
Expand Down Expand Up @@ -318,6 +321,13 @@ func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
messageCh chan ConsumerMessage, dlq *dlqRouter,
metrics *internal.LeveledMetrics) (*partitionConsumer, error) {
var boFunc func() backoff.Policy
if options.backOffPolicyFunc != nil {
boFunc = options.backOffPolicyFunc
} else {
boFunc = backoff.NewDefaultBackoff
}

pc := &partitionConsumer{
parentConsumer: parent,
client: client,
Expand All @@ -339,6 +349,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
backoffPolicyFunc: boFunc,
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
Expand Down Expand Up @@ -581,12 +592,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
}
remainTime := pc.client.operationTimeout
var backoff internal.BackoffPolicy
if pc.options.backoffPolicy != nil {
backoff = pc.options.backoffPolicy
} else {
backoff = &internal.DefaultBackoff{}
}
bo := pc.backoffPolicyFunc()
request := func() (*trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
Expand All @@ -604,7 +610,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
pc.log.WithError(err).Error("Failed to getLastMessageID")
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
}
nextDelay := backoff.Next()
nextDelay := bo.Next()
if nextDelay > remainTime {
nextDelay = remainTime
}
Expand Down Expand Up @@ -1684,18 +1690,17 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}

func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
var maxRetry int
var (
maxRetry int
delayReconnectTime, totalDelayReconnectTime time.Duration
)

if pc.options.maxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*pc.options.maxReconnectToBroker)
}

var (
delayReconnectTime time.Duration
defaultBackoff = internal.DefaultBackoff{}
)
bo := pc.backoffPolicyFunc()

for maxRetry != 0 {
if pc.getConsumerState() != consumerReady {
Expand All @@ -1710,11 +1715,10 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
delayReconnectTime = 0
assignedBrokerURL = connectionClosed.assignedBrokerURL
connectionClosed = nil // Attempt connecting to the assigned broker just once
} else if pc.options.backoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = pc.options.backoffPolicy.Next()
delayReconnectTime = bo.Next()
}
totalDelayReconnectTime += delayReconnectTime

pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
Expand All @@ -1733,6 +1737,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
bo.Reset()
return
}
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
Expand All @@ -1747,7 +1752,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
if maxRetry == 0 || bo.IsMaxBackoffReached() {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
}
Expand Down
10 changes: 6 additions & 4 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
Name: "regex-consumer",
}

dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer",
nil, log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, nil, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -198,8 +199,9 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
Name: "regex-consumer",
}

dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer",
nil, log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, nil, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
Expand Down
16 changes: 10 additions & 6 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
Expand Down Expand Up @@ -3874,12 +3876,14 @@ func TestConsumerWithBackoffPolicy(t *testing.T) {

topicName := newTopicName()

backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
bo := newTestBackoffPolicy(1*time.Second, 4*time.Second)
_consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
BackoffPolicy: backoff,
BackOffPolicyFunc: func() backoff.Policy {
return bo
},
})
assert.Nil(t, err)
defer _consumer.Close()
Expand All @@ -3888,22 +3892,22 @@ func TestConsumerWithBackoffPolicy(t *testing.T) {
// 1 s
startTime := time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
assert.True(t, bo.IsExpectedIntervalFrom(startTime))

// 2 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
assert.True(t, bo.IsExpectedIntervalFrom(startTime))

// 4 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
assert.True(t, bo.IsExpectedIntervalFrom(startTime))

// 4 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
assert.True(t, bo.IsExpectedIntervalFrom(startTime))
}

func TestAckWithMessageID(t *testing.T) {
Expand Down
Loading

0 comments on commit 630d5f8

Please sign in to comment.