diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index d8001dc122..f307972c3c 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -332,7 +332,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon startMessageID: atomicMessageID{msgID: options.startMessageID}, connectedCh: make(chan struct{}), messageCh: messageCh, - connectClosedCh: make(chan *connectionClosed, 10), + connectClosedCh: make(chan *connectionClosed, 1), closeCh: make(chan struct{}), clearQueueCh: make(chan func(id *trackingMessageID)), compressionProviders: sync.Map{}, @@ -1381,8 +1381,12 @@ func (pc *partitionConsumer) ConnectionClosed(closeConsumer *pb.CommandCloseCons assignedBrokerURL = pc.client.selectServiceURL( closeConsumer.GetAssignedBrokerServiceUrl(), closeConsumer.GetAssignedBrokerServiceUrlTls()) } - pc.connectClosedCh <- &connectionClosed{ - assignedBrokerURL: assignedBrokerURL, + + select { + case pc.connectClosedCh <- &connectionClosed{assignedBrokerURL: assignedBrokerURL}: + default: + // Reconnect has already been requested so we do not block the + // connection callback. } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1677c57026..97ab8b942c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -171,7 +171,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions producerID: client.rpcClient.NewProducerID(), dataChan: make(chan *sendRequest, maxPendingMessages), cmdChan: make(chan interface{}, 10), - connectClosedCh: make(chan *connectionClosed, 10), + connectClosedCh: make(chan *connectionClosed, 1), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType), compression.Level(options.CompressionLevel)), @@ -413,8 +413,12 @@ func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProdu assignedBrokerURL = p.client.selectServiceURL( closeProducer.GetAssignedBrokerServiceUrl(), closeProducer.GetAssignedBrokerServiceUrlTls()) } - p.connectClosedCh <- &connectionClosed{ - assignedBrokerURL: assignedBrokerURL, + + select { + case p.connectClosedCh <- &connectionClosed{assignedBrokerURL: assignedBrokerURL}: + default: + // Reconnect has already been requested so we do not block the + // connection callback. } }