Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
crossoverJie committed Sep 18, 2024
1 parent 6a985c2 commit f5c5cc0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
18 changes: 9 additions & 9 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,15 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
compression.Level(options.CompressionLevel)),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
lastSequenceID: -1,
partitionIdx: int32(partitionIdx),
metrics: metrics,
epoch: 0,
schemaCache: newSchemaCache(),
ctx: ctx,
cancelFunc: cancelFunc,
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
lastSequenceID: -1,
partitionIdx: int32(partitionIdx),
metrics: metrics,
epoch: 0,
schemaCache: newSchemaCache(),
ctx: ctx,
cancelFunc: cancelFunc,
backOffPolicyFunc: boFunc,
}
if p.options.DisableBatching {
Expand Down
4 changes: 3 additions & 1 deletion pulsar/transaction_coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
Expand Down Expand Up @@ -143,7 +145,7 @@ func (t *transactionHandler) runEventsLoop() {

func (t *transactionHandler) reconnectToBroker() {
var delayReconnectTime time.Duration
var defaultBackoff = internal.DefaultBackoff{}
var defaultBackoff = backoff.DefaultBackoff{}

for {
if t.getState() == txnHandlerClosed {
Expand Down

0 comments on commit f5c5cc0

Please sign in to comment.