From cb046e543bb192b03877c79119821d40744a4bf2 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 28 Jan 2025 10:39:03 +0100 Subject: [PATCH] De-flake TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm Signed-off-by: Maurice van Veen --- server/jetstream_cluster_4_test.go | 124 +++++++++++++++-------------- 1 file changed, 64 insertions(+), 60 deletions(-) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index e6e4cb1f18..a60f314059 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4276,87 +4276,91 @@ func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T) nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() + checkConsistency := func() { + t.Helper() + checkFor(t, 3*time.Second, 250*time.Millisecond, func() error { + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return err + } + state := mset.state() + if state.Msgs != 3 || state.Bytes != 99 { + return fmt.Errorf("stream state didn't match, got %d messages with %d bytes", state.Msgs, state.Bytes) + } + } + return nil + }) + } + _, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", - Subjects: []string{"foo.>"}, + Subjects: []string{"foo"}, Replicas: 3, }) - nc.Close() require_NoError(t, err) - // Pick one server that will only store a part of the messages in its WAL. - rs := c.randomNonStreamLeader(globalAccountName, "TEST") - ts := time.Now().UnixNano() - - // Manually add 3 append entries to each node's WAL, except for one node who is one behind. - var scratch [1024]byte - for _, s := range c.servers { - for _, n := range s.raftNodes { - rn := n.(*raft) - if rn.accName == globalAccountName { - for i := uint64(0); i < 3; i++ { - // One server will be one behind and need to catchup. - if s.Name() == rs.Name() && i >= 2 { - break - } + sl := c.streamLeader(globalAccountName, "TEST") + require_NoError(t, err) + acc, err := sl.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + rn := mset.raftNode().(*raft) + leaderId := rn.ID() - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, i, ts, true, false) - entries := []*Entry{newEntry(EntryNormal, esm)} - rn.Lock() - ae := rn.buildAppendEntry(entries) - ae.buf, err = ae.encode(scratch[:]) - require_NoError(t, err) - err = rn.storeToWAL(ae) - rn.Unlock() - require_NoError(t, err) - } - } - } + for i := 0; i < 3; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) } + nc.Close() + checkConsistency() - // Restart all. - c.stopAll() - c.restartAll() - c.waitOnAllCurrent() - c.waitOnStreamLeader(globalAccountName, "TEST") + // Pick one server that will only store a part of the messages in its WAL. + rs := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err = rs.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err = acc.lookupStream("TEST") + require_NoError(t, err) + rn = mset.raftNode().(*raft) + index, commit, _ := rn.Progress() + require_Equal(t, index, 4) + require_Equal(t, index, commit) - rs = c.serverByName(rs.Name()) + // We'll simulate as-if the last message was never received/stored. + // Will need to truncate the stream, correct lseq (so the msg isn't skipped) and truncate the WAL. + // This will simulate that the RAFT layer can restore it. + mset.mu.Lock() + mset.lseq-- + err = mset.store.Truncate(2) + mset.mu.Unlock() + require_NoError(t, err) + rn.Lock() + rn.truncateWAL(rn.pterm, rn.pindex-1) + rn.Unlock() // Check all servers ended up with all published messages, which had quorum. - checkFor(t, 3*time.Second, 250*time.Millisecond, func() error { - for _, s := range c.servers { - acc, err := s.lookupAccount(globalAccountName) - if err != nil { - return err - } - mset, err := acc.lookupStream("TEST") - if err != nil { - return err - } - state := mset.state() - if state.Msgs != 3 || state.Bytes != 99 { - return fmt.Errorf("stream state didn't match, got %d messages with %d bytes", state.Msgs, state.Bytes) - } - } - return nil - }) + checkConsistency() - // Check that the first two published messages came from our WAL, and - // the last came from a catchup by another leader. + // Check that all entries came from the expected leader. for _, n := range rs.raftNodes { rn := n.(*raft) if rn.accName == globalAccountName { - ae, err := rn.loadEntry(2) + ae, err := rn.loadEntry(1) require_NoError(t, err) - require_True(t, ae.leader == rn.ID()) + require_Equal(t, ae.leader, leaderId) - ae, err = rn.loadEntry(3) + ae, err = rn.loadEntry(2) require_NoError(t, err) - require_True(t, ae.leader == rn.ID()) + require_Equal(t, ae.leader, leaderId) - ae, err = rn.loadEntry(4) + ae, err = rn.loadEntry(3) require_NoError(t, err) - require_True(t, ae.leader != rn.ID()) + require_Equal(t, ae.leader, leaderId) } } }