Skip to content

Commit

Permalink
fix: pause dispatch message before performing seek
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Aug 10, 2024
1 parent a34ccf2 commit c7c4937
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
24 changes: 15 additions & 9 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,22 +710,33 @@ func (c *consumer) Seek(msgID MessageID) error {
return err
}

if err := c.consumers[msgID.PartitionIdx()].Seek(msgID); err != nil {
return err
}

consumer := c.consumers[msgID.PartitionIdx()]
consumer.pauseDispatchMessage()
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}

if err := consumer.Seek(msgID); err != nil {
return err
}

return nil
}

func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
var errs error

for _, cons := range c.consumers {
cons.pauseDispatchMessage()
}
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}

// run SeekByTime on every partition of topic
for _, cons := range c.consumers {
if err := cons.SeekByTime(time); err != nil {
Expand All @@ -734,11 +745,6 @@ func (c *consumer) SeekByTime(time time.Time) error {
}
}

// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}

return errs
}

Expand Down
17 changes: 16 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -182,6 +183,15 @@ type partitionConsumer struct {
lastMessageInBroker *trackingMessageID

redirectedClusterURI string

discardMessage atomic.Bool
}

// pauseDispatchMessage used to discard the message in the dispatcher goroutine.
// This method will be called When the parent consumer performs the seek operation.
// After the seek operation, the dispatcher will continue dispatching messages automatically.
func (pc *partitionConsumer) pauseDispatchMessage() {
pc.discardMessage.Store(true)
}

func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
Expand Down Expand Up @@ -878,6 +888,7 @@ func (pc *partitionConsumer) Seek(msgID MessageID) error {

func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
defer close(seek.doneCh)
defer pc.discardMessage.Store(false)
seek.err = pc.requestSeek(seek.msgID)
}
func (pc *partitionConsumer) requestSeek(msgID *messageID) error {
Expand Down Expand Up @@ -931,6 +942,8 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error {

// wait for the request to complete
<-req.doneCh
pc.discardMessage.Store(false)

return req.err
}

Expand Down Expand Up @@ -1458,7 +1471,9 @@ func (pc *partitionConsumer) dispatcher() {
pc.dlq.Chan() <- consumerMessage
} else {
// pass the message to application channel
pc.messageCh <- consumerMessage
if !pc.discardMessage.Load() {
pc.messageCh <- consumerMessage
}
}

pc.availablePermits.inc()
Expand Down

0 comments on commit c7c4937

Please sign in to comment.