From e3ce93b59630ef4686075766b0abcdae635673af Mon Sep 17 00:00:00 2001 From: zwb Date: Wed, 6 Sep 2017 10:56:13 +0800 Subject: [PATCH] feat(producer): avoid sending duplicate data --- producer.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/producer.go b/producer.go index ab8232ef..e8d0c4ee 100644 --- a/producer.go +++ b/producer.go @@ -342,12 +342,23 @@ func (w *Producer) popTransaction(frameType int32, data []byte) { } func (w *Producer) transactionCleanup() { - // clean up transactions we can easily account for - for _, t := range w.transactions { - t.Error = ErrNotConnected - t.finish() +l: + for { + select { + case data := <-w.responseChan: + w.popTransaction(FrameTypeResponse, data) + case data := <-w.errorChan: + w.popTransaction(FrameTypeError, data) + default: + // clean up transactions we can easily account for + for _, t := range w.transactions { + t.Error = ErrNotConnected + t.finish() + } + w.transactions = w.transactions[:0] + break l + } } - w.transactions = w.transactions[:0] // spin and free up any writes that might have raced // with the cleanup process (blocked on writing