Skip to content

Commit

Permalink
fix: fix seek and reconnect race
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Aug 1, 2024
1 parent dad98f1 commit f8cfd6d
Showing 1 changed file with 23 additions and 1 deletion.
24 changes: 23 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -182,6 +183,8 @@ type partitionConsumer struct {
lastMessageInBroker *trackingMessageID

redirectedClusterURI string

seeking atomic.Bool
}

func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
Expand Down Expand Up @@ -909,11 +912,18 @@ func (pc *partitionConsumer) requestSeekWithoutClear(msgID *messageID) error {
MessageId: id,
}

if !pc.seeking.CompareAndSwap(false, true) {
return errors.New("seek operation is already in progress")
}
defer pc.seeking.Store(false)

_, err = pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_SEEK, cmdSeek)
if err != nil {
pc.seeking.Store(false)
pc.log.WithError(err).Error("Failed to reset to message id")
return err
}
pc.seeking.Store(false)
return nil
}

Expand All @@ -936,7 +946,6 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error {

func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
defer close(seek.doneCh)

state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
pc.log.WithField("state", pc.state).Error("Failed seekByTime by consumer is closing or has closed")
Expand All @@ -950,6 +959,12 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
MessagePublishTime: proto.Uint64(uint64(seek.publishTime.UnixNano() / int64(time.Millisecond))),
}

if !pc.seeking.CompareAndSwap(false, true) {
seek.err = errors.New("seek operation is already in progress")
return
}
defer pc.seeking.Store(false)

_, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_SEEK, cmdSeek)
if err != nil {
pc.log.WithError(err).Error("Failed to reset to message publish time")
Expand Down Expand Up @@ -1594,6 +1609,13 @@ func (pc *partitionConsumer) runEventsLoop() {
pc.log.Info("close consumer, exit reconnect")
return
case connectionClosed := <-pc.connectClosedCh:
if pc.seeking.Load() {
go func() {
<-time.After(100 * time.Millisecond)
pc.connectClosedCh <- connectionClosed
}()
continue
}
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker(connectionClosed)
}
Expand Down

0 comments on commit f8cfd6d

Please sign in to comment.