Skip to content

Commit

Permalink
refactor: rename pendingItem.Complete() to pendingItem.done()
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Aug 30, 2023
1 parent 6b4443e commit 632f367
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ type pendingItem struct {
sequenceID uint64
sentAt time.Time
sendRequests []interface{}
completed bool
isDone bool
flushCallback func(err error)
}

Expand Down Expand Up @@ -839,7 +839,7 @@ func (p *partitionProducer) failTimeoutMessages() {
}

// flag the sending has completed with error, flush make no effect
pi.Complete(errSendTimeout)
pi.done(errSendTimeout)
pi.Unlock()

// finally reached the last view item, current iteration ends
Expand Down Expand Up @@ -914,7 +914,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
pi.Lock()
defer pi.Unlock()

if pi.completed {
if pi.isDone {
// The last item in the queue has been completed while we were
// looking at it. It's safe at this point to assume that every
// message enqueued before Flush() was called are now persisted
Expand Down Expand Up @@ -1327,7 +1327,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
}

// Mark this pending item as done
pi.Complete(nil)
pi.done(nil)
}
}

Expand Down Expand Up @@ -1397,7 +1397,7 @@ func (p *partitionProducer) failPendingMessages() {
}

// flag the sending has completed with error, flush make no effect
pi.Complete(errProducerClosed)
pi.done(errProducerClosed)
pi.Unlock()

// finally reached the last view item, current iteration ends
Expand Down Expand Up @@ -1538,11 +1538,12 @@ type flushRequest struct {
err error
}

func (i *pendingItem) Complete(err error) {
if i.completed {
func (i *pendingItem) done(err error) {
if i.isDone {
return
}
i.completed = true

i.isDone = true
buffersPool.Put(i.buffer)
if i.flushCallback != nil {
i.flushCallback(err)
Expand Down

0 comments on commit 632f367

Please sign in to comment.