Skip to content

Commit

Permalink
[FIXED] Deterministic clustered dedupe (#6415)
Browse files Browse the repository at this point in the history
Issue description:
- Publish a message with a `Nats-Msg-Id`.
- Leader election through restarts/networking/etc.
- Due to leader election the proposal queue is drained and the message
is not stored/proposed.
- Any retries of the original message will fail until the de-dupe window
clears the original message, even though it was not stored in the
stream.

This issue is due to staging a zero sequence in the de-dupe map. An easy
solution seems to clear all zero sequence entries from the de-dupe map
upon stepping down, but that has correctness issues if any of those
messages did get proposed.

This PR removes the staging of a zero sequence and ensures all replicas
can deterministically do de-duping themselves. And duplicate messages
are only blocked at the cluster-level if we know what the sequence is.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored and MauriceVanVeen committed Jan 28, 2025
1 parent 71c9886 commit 861081f
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 45 deletions.
11 changes: 7 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7903,6 +7903,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}

// Some header checks can be checked pre proposal. Most can not.
var ts = time.Now().UnixNano()
var msgId string
if len(hdr) > 0 {
// Since we encode header len as u16 make sure we do not exceed.
Expand Down Expand Up @@ -7956,6 +7957,8 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// Will help during restarts.
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
mset.mu.Lock()
// Since purging is delayed for the clustered de-dupe map, deterministically try to purge based on timestamp.
mset.purgeMsgIdsAtLocked(ts)
if dde := mset.checkMsgId(msgId); dde != nil {
var buf [256]byte
pubAck := append(buf[:0], mset.pubAck...)
Expand All @@ -7968,9 +7971,9 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
return errMsgIdDuplicate
}
// FIXME(dlc) - locking conflict with accessing mset.clseq
// For now we stage with zero, and will update in processStreamMsg.
mset.storeMsgIdLocked(&ddentry{msgId, 0, time.Now().UnixNano()})
// We used to stage with zero, but it's hard to correctly remove it during leader elections
// while taking quorum/truncation into account. So instead let duplicates through and handle
// duplicates later. Only if we know the sequence we can start blocking above.
mset.mu.Unlock()
}
}
Expand Down Expand Up @@ -8030,7 +8033,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
}

esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), compressOK)
esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, ts, compressOK)
// Do proposal.
err := node.Propose(esm)
if err == nil {
Expand Down
95 changes: 95 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2865,9 +2865,104 @@ func TestJetStreamClusterPubAckSequenceDupe(t *testing.T) {
require_NoError(t, err)
require_Equal(t, seq, pubAck2.Sequence)
require_True(t, pubAck2.Duplicate)
}
}

func TestJetStreamClusterPubAckSequenceDupeAsync(t *testing.T) {
c := createJetStreamClusterExplicit(t, "TEST_CLUSTER", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST_STREAM",
Subjects: []string{"TEST_SUBJECT"},
Replicas: 3,
Duplicates: 1 * time.Minute,
})
require_NoError(t, err)

msgData := []byte("...")

for seq := uint64(1); seq < 10; seq++ {

msgSubject := "TEST_SUBJECT"
msgIdOpt := nats.MsgId(nuid.Next())

wg := sync.WaitGroup{}
wg.Add(2)

// Fire off 2 publish requests in parallel
// The first one "stages" a duplicate entry before even proposing the message
// The second one gets a pubAck with sequence zero by hitting the staged duplicated entry

pubAcks := [2]*nats.PubAck{}
for i := 0; i < 2; i++ {
go func(i int) {
defer wg.Done()
var err error
pubAcks[i], err = js.Publish(msgSubject, msgData, msgIdOpt)
require_NoError(t, err)
}(i)
}

wg.Wait()
require_Equal(t, pubAcks[0].Sequence, seq)
require_Equal(t, pubAcks[1].Sequence, seq)

// Exactly one of the pubAck should be marked dupe
require_True(t, (pubAcks[0].Duplicate || pubAcks[1].Duplicate) && (pubAcks[0].Duplicate != pubAcks[1].Duplicate))
}
}

func TestJetStreamClusterPubAckSequenceDupeDeterministic(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

duplicates := 2 * time.Minute
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
Duplicates: duplicates,
})
require_NoError(t, err)

sl := c.streamLeader(globalAccountName, "TEST")
acc, err := sl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)

baseTs := time.Now().Add(-time.Hour)
ts := baseTs.UnixNano()
hdr := []byte("NATS/1.0\r\nNats-Msg-Id: msgId\r\n\r\n")

// First message is stored.
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr, nil, 0, ts)
require_NoError(t, err)

// Second message sent at the same time is de-duped.
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr, nil, 1, ts)
require_Error(t, err, errMsgIdDuplicate)

// Third message is also de-duped.
ts = baseTs.Add(duplicates).Add(-time.Nanosecond).UnixNano()
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr, nil, 2, ts)
require_Error(t, err, errMsgIdDuplicate)

// Fourth message is exactly at the dupe window, must be accepted.
ts = baseTs.Add(duplicates).UnixNano()
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr, nil, 3, ts)
require_NoError(t, err)

// Also confirm the leader can allow a message to go through, even if the purging timer hasn't cleaned it up yet.
err = mset.processClusteredInboundMsg("foo", _EMPTY_, hdr, nil)
require_NoError(t, err)
}

func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {
Expand Down
90 changes: 49 additions & 41 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3979,24 +3979,17 @@ func (mset *stream) purgeMsgIds() {
mset.mu.Lock()
defer mset.mu.Unlock()

now := time.Now().UnixNano()
tmrNext := mset.cfg.Duplicates
window := int64(tmrNext)

for i, dde := range mset.ddarr[mset.ddindex:] {
if now-dde.ts >= window {
delete(mset.ddmap, dde.id)
} else {
mset.ddindex += i
// Check if we should garbage collect here if we are 1/3 total size.
if cap(mset.ddarr) > 3*(len(mset.ddarr)-mset.ddindex) {
mset.ddarr = append([]*ddentry(nil), mset.ddarr[mset.ddindex:]...)
mset.ddindex = 0
}
tmrNext = time.Duration(window - (now - dde.ts))
break
}
now := time.Now()
// If clustered we must ensure all servers agree on which message is marked as a duplicate.
// But, since the de-dupe map is time-dependent we must take into account clock skew/ordering guarantees.
// For example a replica clearing the de-dupe map earlier than the leader would, could result in desync,
// so deliberately delay replicas from cleaning up before the leader is likely to.
// processJetStreamMsg/processClusteredInboundMsg are both aware of this timer, and they clean up inline if necessary.
if mset.isClustered() {
now = now.Add(-5 * time.Second)
}
tmrNext := mset.purgeMsgIdsAtLocked(now.UnixNano())

if len(mset.ddmap) > 0 {
// Make sure to not fire too quick
const minFire = 50 * time.Millisecond
Expand All @@ -4019,6 +4012,29 @@ func (mset *stream) purgeMsgIds() {
}
}

// Will purge the entries that are past the window given a specific ts.
// Lock should be held.
func (mset *stream) purgeMsgIdsAtLocked(ts int64) time.Duration {
tmrNext := mset.cfg.Duplicates
window := int64(tmrNext)

for i, dde := range mset.ddarr[mset.ddindex:] {
if ts-dde.ts >= window {
delete(mset.ddmap, dde.id)
} else {
mset.ddindex += i
// Check if we should garbage collect here if we are 1/3 total size.
if cap(mset.ddarr) > 3*(len(mset.ddarr)-mset.ddindex) {
mset.ddarr = append([]*ddentry(nil), mset.ddarr[mset.ddindex:]...)
mset.ddindex = 0
}
tmrNext = time.Duration(window - (ts - dde.ts))
break
}
}
return tmrNext
}

// storeMsgIdLocked will store the message id for duplicate detection.
// Lock should be held.
func (mset *stream) storeMsgIdLocked(dde *ddentry) {
Expand Down Expand Up @@ -4388,21 +4404,23 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}

// Dedupe detection. This is done at the cluster level for dedupe detectiom above the
// lower layers. But we still need to pull out the msgId.
// Dedupe detection. This is done at the cluster level for dedupe detection above the
// lower layers. But not while the message is not applied yet, so we need to still check if
// multiple messages with the same ID are proposed at the same time and block here.
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
// Do real check only if not clustered or traceOnly flag is set.
if !isClustered {
if dde := mset.checkMsgId(msgId); dde != nil {
mset.mu.Unlock()
bumpCLFS()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ",\"duplicate\": true}"...)
outq.sendMsg(reply, response)
}
return errMsgIdDuplicate
// If clustered we know the timestamp, so deterministically try to purge.
if isClustered {
mset.purgeMsgIdsAtLocked(ts)
}
if dde := mset.checkMsgId(msgId); dde != nil {
mset.mu.Unlock()
bumpCLFS()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ",\"duplicate\": true}"...)
outq.sendMsg(reply, response)
}
return errMsgIdDuplicate
}
}

Expand Down Expand Up @@ -4674,18 +4692,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}

// If we have a msgId make sure to save.
// This will replace our estimate from the cluster layer if we are clustered.
if msgId != _EMPTY_ {
if isClustered && isLeader && mset.ddmap != nil {
if dde := mset.ddmap[msgId]; dde != nil {
dde.seq, dde.ts = seq, ts
} else {
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
}
} else {
// R1 or not leader..
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
}
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
}

// If here we succeeded in storing the message.
Expand Down

0 comments on commit 861081f

Please sign in to comment.