Skip to content

Commit

Permalink
[improve] stop timer when close timedAckGroupingTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
geniusjoe committed Sep 3, 2024
1 parent 953d9ea commit 7e60259
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
5 changes: 4 additions & 1 deletion pulsar/ack_grouping_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type timedAckGroupingTracker struct {

// Key is the pair of the ledger id and the entry id,
// Value is the bit set that represents which messages are acknowledged if the entry stores a batch.
// The bit 1 represents the message has been acknowledged, i.e. the bits "111" represents all messages
// The bit 1 represents the message has not been acknowledged, i.e. the bits "111" represents all messages
// in the batch whose batch size is 3 are not acknowledged.
// After the 1st message (i.e. batch index is 0) is acknowledged, the bits will become "011".
// Value is nil if the entry represents a single message.
Expand Down Expand Up @@ -241,6 +241,9 @@ func (t *timedAckGroupingTracker) clearPendingAcks() map[[2]uint64]*bitset.BitSe
}

func (t *timedAckGroupingTracker) close() {
if t.ticker != nil {
t.ticker.Stop()
}
t.flushAndClean()
if t.exitCh != nil {
close(t.exitCh)
Expand Down
46 changes: 46 additions & 0 deletions pulsar/ack_grouping_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,52 @@ func TestDuplicateAfterClose(t *testing.T) {
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
}

func TestCloseFlushWithoutTimer(t *testing.T) {
var acker mockAcker
tracker := newAckGroupingTracker(
&AckGroupingOptions{MaxSize: 3, MaxTime: 0},
nil,
func(id MessageID) { acker.ackCumulative(id) },
func(ids []*pb.MessageIdData) { acker.ack(ids) },
)

// case 1: message will not be acked because the cache is not full
tracker.add(&messageID{ledgerID: 1})
tracker.add(&messageID{ledgerID: 2})
assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(1)}))
assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(2)}))
assert.Equal(t, 0, len(acker.getLedgerIDs()))

// case 2: tracker close so that all messages are flushed and acked
tracker.close()
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 2}))
assert.Equal(t, []int64{1, 2}, acker.getLedgerIDs())
}

func TestCloseFlushWithTimer(t *testing.T) {
var acker mockAcker
tracker := newAckGroupingTracker(
&AckGroupingOptions{MaxSize: 1000, MaxTime: 10 * 1000},
nil,
func(id MessageID) { acker.ackCumulative(id) },
func(ids []*pb.MessageIdData) { acker.ack(ids) },
)

// case 1: messages are not acked because the cache is not full
tracker.add(&messageID{ledgerID: 1})
tracker.add(&messageID{ledgerID: 2})
assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(1)}))
assert.True(t, tracker.isDuplicate(&messageID{ledgerID: int64(2)}))
assert.Equal(t, 0, len(acker.getLedgerIDs()))

// case 2: tracker close so that all messages are flushed and acked
tracker.close()
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 2}))
assert.Equal(t, []int64{1, 2}, acker.getLedgerIDs())
}

func TestTrackerPendingAcks(t *testing.T) {
m := make(map[uint64][]int64)
tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, MaxTime: 0}, nil, nil,
Expand Down

0 comments on commit 7e60259

Please sign in to comment.