Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

De-flake TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm #6417

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 64 additions & 60 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4276,87 +4276,91 @@ func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T)
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

checkConsistency := func() {
t.Helper()
checkFor(t, 3*time.Second, 250*time.Millisecond, func() error {
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return err
}
state := mset.state()
if state.Msgs != 3 || state.Bytes != 99 {
return fmt.Errorf("stream state didn't match, got %d messages with %d bytes", state.Msgs, state.Bytes)
}
}
return nil
})
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
Subjects: []string{"foo"},
Replicas: 3,
})
nc.Close()
require_NoError(t, err)

// Pick one server that will only store a part of the messages in its WAL.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
ts := time.Now().UnixNano()

// Manually add 3 append entries to each node's WAL, except for one node who is one behind.
var scratch [1024]byte
for _, s := range c.servers {
for _, n := range s.raftNodes {
rn := n.(*raft)
if rn.accName == globalAccountName {
for i := uint64(0); i < 3; i++ {
// One server will be one behind and need to catchup.
if s.Name() == rs.Name() && i >= 2 {
break
}
sl := c.streamLeader(globalAccountName, "TEST")
require_NoError(t, err)
acc, err := sl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
rn := mset.raftNode().(*raft)
leaderId := rn.ID()

esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, i, ts, true, false)
entries := []*Entry{newEntry(EntryNormal, esm)}
rn.Lock()
ae := rn.buildAppendEntry(entries)
ae.buf, err = ae.encode(scratch[:])
require_NoError(t, err)
err = rn.storeToWAL(ae)
rn.Unlock()
require_NoError(t, err)
}
}
}
for i := 0; i < 3; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}
nc.Close()
checkConsistency()

// Restart all.
c.stopAll()
c.restartAll()
c.waitOnAllCurrent()
c.waitOnStreamLeader(globalAccountName, "TEST")
// Pick one server that will only store a part of the messages in its WAL.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
acc, err = rs.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err = acc.lookupStream("TEST")
require_NoError(t, err)
rn = mset.raftNode().(*raft)
index, commit, _ := rn.Progress()
require_Equal(t, index, 4)
require_Equal(t, index, commit)

rs = c.serverByName(rs.Name())
// We'll simulate as-if the last message was never received/stored.
// Will need to truncate the stream, correct lseq (so the msg isn't skipped) and truncate the WAL.
// This will simulate that the RAFT layer can restore it.
mset.mu.Lock()
mset.lseq--
err = mset.store.Truncate(2)
mset.mu.Unlock()
require_NoError(t, err)
rn.Lock()
rn.truncateWAL(rn.pterm, rn.pindex-1)
rn.Unlock()

// Check all servers ended up with all published messages, which had quorum.
checkFor(t, 3*time.Second, 250*time.Millisecond, func() error {
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return err
}
state := mset.state()
if state.Msgs != 3 || state.Bytes != 99 {
return fmt.Errorf("stream state didn't match, got %d messages with %d bytes", state.Msgs, state.Bytes)
}
}
return nil
})
checkConsistency()

// Check that the first two published messages came from our WAL, and
// the last came from a catchup by another leader.
// Check that all entries came from the expected leader.
for _, n := range rs.raftNodes {
rn := n.(*raft)
if rn.accName == globalAccountName {
ae, err := rn.loadEntry(2)
ae, err := rn.loadEntry(1)
require_NoError(t, err)
require_True(t, ae.leader == rn.ID())
require_Equal(t, ae.leader, leaderId)

ae, err = rn.loadEntry(3)
ae, err = rn.loadEntry(2)
require_NoError(t, err)
require_True(t, ae.leader == rn.ID())
require_Equal(t, ae.leader, leaderId)

ae, err = rn.loadEntry(4)
ae, err = rn.loadEntry(3)
require_NoError(t, err)
require_True(t, ae.leader != rn.ID())
require_Equal(t, ae.leader, leaderId)
}
}
}
Expand Down
Loading