diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index c4a460eed..6bd90818e 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -84,8 +84,8 @@ type partitionProducer struct { compressionProvider compression.Provider // Channel where app is posting messages to be published - eventsChan chan interface{} - closeCh chan struct{} + dataChan chan *sendRequest + cmdChan chan interface{} connectClosedCh chan connectionClosed publishSemaphore internal.Semaphore @@ -150,9 +150,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions log: logger, options: options, producerID: client.rpcClient.NewProducerID(), - eventsChan: make(chan interface{}, maxPendingMessages), + dataChan: make(chan *sendRequest, maxPendingMessages), + cmdChan: make(chan interface{}, 10), connectClosedCh: make(chan connectionClosed, 10), - closeCh: make(chan struct{}), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType), compression.Level(options.CompressionLevel)), @@ -438,31 +438,21 @@ func (p *partitionProducer) reconnectToBroker() { } func (p *partitionProducer) runEventsLoop() { - go func() { - for { - select { - case <-p.closeCh: - p.log.Info("close producer, exit reconnect") - return - case <-p.connectClosedCh: - p.log.Info("runEventsLoop will reconnect in producer") - p.reconnectToBroker() - } - } - }() - for { select { - case i := <-p.eventsChan: + case data := <-p.dataChan: + p.internalSend(data) + case i := <-p.cmdChan: switch v := i.(type) { - case *sendRequest: - p.internalSend(v) case *flushRequest: p.internalFlush(v) case *closeProducer: p.internalClose(v) return } + case <-p.connectClosedCh: + p.log.Info("runEventsLoop will reconnect in producer") + p.reconnectToBroker() case <-p.batchFlushTicker.C: p.internalFlushCurrentBatch() } @@ -1165,7 +1155,7 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer } p.options.Interceptors.BeforeSend(p, msg) - p.eventsChan <- sr + p.dataChan <- sr if !p.options.DisableBlockIfQueueFull { // block if queue full @@ -1304,8 +1294,6 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.setProducerState(producerClosed) p._getConn().UnregisterListener(p.producerID) p.batchFlushTicker.Stop() - - close(p.closeCh) } func (p *partitionProducer) LastSequenceID() int64 { @@ -1317,7 +1305,7 @@ func (p *partitionProducer) Flush() error { doneCh: make(chan struct{}), err: nil, } - p.eventsChan <- flushReq + p.cmdChan <- flushReq // wait for the flush request to complete <-flushReq.doneCh @@ -1345,7 +1333,7 @@ func (p *partitionProducer) Close() { } cp := &closeProducer{doneCh: make(chan struct{})} - p.eventsChan <- cp + p.cmdChan <- cp // wait for close producer request to complete <-cp.doneCh