-
Notifications
You must be signed in to change notification settings - Fork 443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
producer: avoid sending duplicate data #217
base: master
Are you sure you want to change the base?
Conversation
l: | ||
for { | ||
select { | ||
case data := <-w.responseChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we probably have the same race in w.errorChan
, so we should flush that channel too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right. The real error reason should pass to user, not just set error reason to ErrNotConnected
. I will add this code.
for _, t := range w.transactions { | ||
t.Error = ErrNotConnected | ||
t.finish() | ||
l: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this label is necessary since it's not a nested for
loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In for
loop, there's a select
in it, without label, break
only jump out from select
not for
.
for { | ||
select { | ||
case data := <-w.responseChan: | ||
w.popTransaction(FrameTypeResponse, data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be popped and finished with ErrNotConnected
, like below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the release code like below:
func (w *Producer) transactionCleanup() {
l:
for {
select {
case data := <-w.responseChan:
w.popTransaction(FrameTypeResponse, data)
case data := <-w.errorChan:
w.popTransaction(FrameTypeError, data)
default:
// clean up transactions we can easily account for
for _, t := range w.transactions {
t.Error = ErrNotConnected
t.finish()
}
w.transactions = w.transactions[:0]
break l
}
}
... ...
}
The transaction poped in chan w.responseChan
branch means successfully sent, so transaction's error reason is empty and should not be set.
The transaction poped in chan w.errorChan
branch means sent failed, should be send again, and transaction's error reason should be set to the real reason, so user can know what happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I update my code, to add
case data := <-w.errorChan:
w.popTransaction(FrameTypeError, data)
for _, t := range w.transactions { | ||
t.Error = ErrNotConnected | ||
t.finish() | ||
l: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In for
loop, there's a select
in it, without label, break
only jump out from select
not for
.
l: | ||
for { | ||
select { | ||
case data := <-w.responseChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right. The real error reason should pass to user, not just set error reason to ErrNotConnected
. I will add this code.
for { | ||
select { | ||
case data := <-w.responseChan: | ||
w.popTransaction(FrameTypeResponse, data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the release code like below:
func (w *Producer) transactionCleanup() {
l:
for {
select {
case data := <-w.responseChan:
w.popTransaction(FrameTypeResponse, data)
case data := <-w.errorChan:
w.popTransaction(FrameTypeError, data)
default:
// clean up transactions we can easily account for
for _, t := range w.transactions {
t.Error = ErrNotConnected
t.finish()
}
w.transactions = w.transactions[:0]
break l
}
}
... ...
}
The transaction poped in chan w.responseChan
branch means successfully sent, so transaction's error reason is empty and should not be set.
The transaction poped in chan w.errorChan
branch means sent failed, should be send again, and transaction's error reason should be set to the real reason, so user can know what happened.
@zwb-ict I've looked a bit more into this. I don't think the race condition you've identified here is possible. The underlying connection only calls to the |
@mreiferson func (w *Producer) Stop() {
w.guard.Lock()
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
w.guard.Unlock()
return
}
w.log(LogLevelInfo, "stopping")
close(w.exitChan)
w.close()
w.guard.Unlock()
w.wg.Wait()
}
So router may exit func (w *Producer) router() {
for {
select {
case t := <-w.transactionChan:
w.transactions = append(w.transactions, t)
err := w.conn.WriteCommand(t.cmd)
if err != nil {
w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
w.close()
}
case data := <-w.responseChan:
w.popTransaction(FrameTypeResponse, data)
case data := <-w.errorChan:
w.popTransaction(FrameTypeError, data)
case <-w.closeChan:
goto exit
case <-w.exitChan:
goto exit
}
}
exit:
w.transactionCleanup()
w.wg.Done()
w.log(LogLevelInfo, "exiting router")
} Before producer's goroutine run |
@mreiferson |
@zwb-ict yes, it looks like in that case the race would be possible.
Hmmm, possibly, but let's not do that in this PR? |
@mreiferson yes, let's not do that in this PR. |
after producer called Stop, there may be data in channal w.responseChan, and exitChan may has been closed, and select randomly selected goto exit.
then in
if we don't read responseChan, the successfully sent data may be send again after creating a new producer and publish again.