Skip to content

Commit

Permalink
[Issue 1027][producer] fix: split sendRequest and make reconnectToBro…
Browse files Browse the repository at this point in the history
…ker and other operate in the same coroutine (#1029)
  • Loading branch information
graysonzeng committed Jun 16, 2023
1 parent 6acecf0 commit 7f91b2b
Showing 1 changed file with 13 additions and 25 deletions.
38 changes: 13 additions & 25 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type partitionProducer struct {
compressionProvider compression.Provider

// Channel where app is posting messages to be published
eventsChan chan interface{}
closeCh chan struct{}
dataChan chan *sendRequest
cmdChan chan interface{}
connectClosedCh chan connectionClosed

publishSemaphore internal.Semaphore
Expand Down Expand Up @@ -150,9 +150,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
log: logger,
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
dataChan: make(chan *sendRequest, maxPendingMessages),
cmdChan: make(chan interface{}, 10),
connectClosedCh: make(chan connectionClosed, 10),
closeCh: make(chan struct{}),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
compression.Level(options.CompressionLevel)),
Expand Down Expand Up @@ -438,31 +438,21 @@ func (p *partitionProducer) reconnectToBroker() {
}

func (p *partitionProducer) runEventsLoop() {
go func() {
for {
select {
case <-p.closeCh:
p.log.Info("close producer, exit reconnect")
return
case <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker()
}
}
}()

for {
select {
case i := <-p.eventsChan:
case data := <-p.dataChan:
p.internalSend(data)
case i := <-p.cmdChan:
switch v := i.(type) {
case *sendRequest:
p.internalSend(v)
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
case <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker()
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
Expand Down Expand Up @@ -1165,7 +1155,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
}
p.options.Interceptors.BeforeSend(p, msg)

p.eventsChan <- sr
p.dataChan <- sr

if !p.options.DisableBlockIfQueueFull {
// block if queue full
Expand Down Expand Up @@ -1304,8 +1294,6 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
p.setProducerState(producerClosed)
p._getConn().UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()

close(p.closeCh)
}

func (p *partitionProducer) LastSequenceID() int64 {
Expand All @@ -1317,7 +1305,7 @@ func (p *partitionProducer) Flush() error {
doneCh: make(chan struct{}),
err: nil,
}
p.eventsChan <- flushReq
p.cmdChan <- flushReq

// wait for the flush request to complete
<-flushReq.doneCh
Expand Down Expand Up @@ -1345,7 +1333,7 @@ func (p *partitionProducer) Close() {
}

cp := &closeProducer{doneCh: make(chan struct{})}
p.eventsChan <- cp
p.cmdChan <- cp

// wait for close producer request to complete
<-cp.doneCh
Expand Down

0 comments on commit 7f91b2b

Please sign in to comment.