From eda691e25107ea440d642b8a7f591a914e74b6d8 Mon Sep 17 00:00:00 2001 From: zengguan <916028390@qq.com> Date: Thu, 20 Jul 2023 20:23:14 +0800 Subject: [PATCH] [Fix][producer] Ensure all data in dataChan will be processed when internalClose() was called --- pulsar/producer_partition.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 11d5f652e4..c5cc4c68a2 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1289,7 +1289,16 @@ func (p *partitionProducer) internalClose(req *closeProducer) { return } + defer close(p.dataChan) + defer close(p.cmdChan) p.log.Info("Closing producer") + flushReq := &flushRequest{ + doneCh: make(chan struct{}), + err: nil, + } + p.internalFlush(flushReq) + // wait for the flush request to complete + <-flushReq.doneCh id := p.client.rpcClient.NewRequestID() _, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{