Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Allow compress after restart #6444

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 6 additions & 38 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3297,15 +3297,13 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
client, subject, reply := sa.Client, sa.Subject, sa.Reply
hasResponded := sa.responded
sa.responded = true
peers := copyStrings(sa.Group.Peers)
js.mu.Unlock()

streamName := mset.name()

if isLeader {
s.Noticef("JetStream cluster new stream leader for '%s > %s'", account, streamName)
s.sendStreamLeaderElectAdvisory(mset)
mset.checkAllowMsgCompress(peers)
} else {
// We are stepping down.
// Make sure if we are doing so because we have lost quorum that we send the appropriate advisories.
Expand Down Expand Up @@ -7762,28 +7760,21 @@ func decodeStreamMsg(buf []byte) (subject, reply string, hdr, msg []byte, lseq u
return subject, reply, hdr, msg, lseq, ts, sourced, nil
}

// Helper to return if compression allowed.
func (mset *stream) compressAllowed() bool {
mset.clMu.Lock()
defer mset.clMu.Unlock()
return mset.compressOK
}

// Flags for encodeStreamMsg/decodeStreamMsg.
const (
msgFlagFromSourceOrMirror uint64 = 1 << iota
)

func encodeStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, sourced bool) []byte {
return encodeStreamMsgAllowCompress(subject, reply, hdr, msg, lseq, ts, sourced, false)
return encodeStreamMsgAllowCompress(subject, reply, hdr, msg, lseq, ts, sourced)
}

// Threshold for compression.
// TODO(dlc) - Eventually make configurable.
const compressThreshold = 8192 // 8k

// If allowed and contents over the threshold we will compress.
func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, sourced bool, compressOK bool) []byte {
func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, sourced bool) []byte {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have then removed this function and keep the original encodeStreamMsg(), except that encodeStreamMsg() could have the comment that says that if the content is over the threshold, it will be compressed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe they will not be able to sync until all servers upgraded, should not do any data corruption.

// Clip the subject, reply, header and msgs down. Operate on
// uint64 lengths to avoid overflowing.
slen := min(uint64(len(subject)), math.MaxUint16)
Expand All @@ -7792,7 +7783,7 @@ func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq u
mlen := min(uint64(len(msg)), math.MaxUint32)
total := slen + rlen + hlen + mlen

shouldCompress := compressOK && total > compressThreshold
shouldCompress := total > compressThreshold
elen := int(1 + 8 + 8 + total)
elen += (2 + 2 + 2 + 4 + 8) // Encoded lengths, 4bytes, flags are up to 8 bytes

Expand Down Expand Up @@ -7928,26 +7919,6 @@ func (mset *stream) stateSnapshotLocked() []byte {
return b
}

// Will check if we can do message compression in RAFT and catchup logic.
func (mset *stream) checkAllowMsgCompress(peers []string) {
allowed := true
for _, id := range peers {
sir, ok := mset.srv.nodeToInfo.Load(id)
if !ok || sir == nil {
allowed = false
break
}
// Check for capability.
if si := sir.(nodeInfo); si.cfg == nil || !si.cfg.CompressOK {
allowed = false
break
}
}
mset.mu.Lock()
mset.compressOK = allowed
mset.mu.Unlock()
}

// To warn when we are getting too far behind from what has been proposed vs what has been committed.
const streamLagWarnThreshold = 10_000

Expand All @@ -7962,7 +7933,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq
interestPolicy, discard, maxMsgs, maxBytes := mset.cfg.Retention != LimitsPolicy, mset.cfg.Discard, mset.cfg.MaxMsgs, mset.cfg.MaxBytes
isLeader, isSealed, compressOK, allowTTL := mset.isLeader(), mset.cfg.Sealed, mset.compressOK, mset.cfg.AllowMsgTTL
isLeader, isSealed, allowTTL := mset.isLeader(), mset.cfg.Sealed, mset.cfg.AllowMsgTTL
mset.mu.RUnlock()

// This should not happen but possible now that we allow scale up, and scale down where this could trigger.
Expand Down Expand Up @@ -8237,7 +8208,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
}

esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, ts, sourced, compressOK)
esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, ts, sourced)
var mtKey uint64
if mt != nil {
mtKey = mset.clseq
Expand Down Expand Up @@ -9168,9 +9139,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {

mset.setCatchupPeer(sreq.Peer, last-seq)

// Check if we can compress during this.
compressOk := mset.compressAllowed()

var spb int
const minWait = 5 * time.Second

Expand Down Expand Up @@ -9313,7 +9281,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
sendDR()
}
// Send the normal message now.
sendEM(encodeStreamMsgAllowCompress(sm.subj, _EMPTY_, sm.hdr, sm.msg, sm.seq, sm.ts, false, compressOk))
sendEM(encodeStreamMsgAllowCompress(sm.subj, _EMPTY_, sm.hdr, sm.msg, sm.seq, sm.ts, false))
} else {
if drOk {
if dr.First == 0 {
Expand Down
91 changes: 83 additions & 8 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7420,16 +7420,91 @@ func TestJetStreamClusterCompressedStreamMessages(t *testing.T) {
})
require_NoError(t, err)

// 32k (compress threshold ~4k)
toSend, msg := 10_000, []byte(strings.Repeat("ABCD", 8*1024))
for i := 0; i < toSend; i++ {
js.PublishAsync("foo", msg)
sl := c.streamLeader(globalAccountName, "TEST")
acc, err := sl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
rn := mset.raftNode().(*raft)

// Block snapshots by marking as-if we're doing catchup.
blockSnapshots := func() {
rn.Lock()
defer rn.Unlock()
rn.progress = make(map[string]*ipQueue[uint64])
rn.progress["blockSnapshots"] = newIPQueue[uint64](rn.s, "blockSnapshots")
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
blockSnapshots()

// 32k (compress threshold ~8k)
publishMessages := func() {
t.Helper()
toSend, msg := 10_000, []byte(strings.Repeat("ABCD", 8*1024))
for i := 0; i < toSend; i++ {
js.PublishAsync("foo", msg)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
}
publishMessages()

checkMessagesAreCompressed := func() {
t.Helper()
rn.Lock()
defer rn.Unlock()
var ss StreamState
rn.wal.FastState(&ss)
require_NotEqual(t, ss.Msgs, 0)
var containsCompressed bool
for i := ss.FirstSeq; i <= ss.LastSeq; i++ {
ae, err := rn.loadEntry(i)
require_NoError(t, err)
for _, e := range ae.entries {
if e.Type == EntryNormal && len(e.Data) > 0 {
if entryOp(e.Data[0]) == streamMsgOp {
t.Fatalf("Received non-compressed stream msg")
} else if entryOp(e.Data[0]) == compressedStreamMsgOp {
containsCompressed = true
}
}
}
ae.returnToPool()
}
require_True(t, containsCompressed)
}
checkMessagesAreCompressed()

nc.Close()
c.stopAll()

// Only restart two servers, leaving one offline for the time being.
c.restartServer(c.servers[0])
c.restartServer(c.servers[1])

c.waitOnStreamLeader(globalAccountName, "TEST")
sl = c.streamLeader(globalAccountName, "TEST")
acc, err = sl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err = acc.lookupStream("TEST")
require_NoError(t, err)
rn = mset.raftNode().(*raft)
blockSnapshots()

// Now that the stream leader is elected, bring up the last server.
// Must ensure compression is still used, even if the server came up a bit later.
ls := c.restartServer(c.servers[2])
c.waitOnServerHealthz(ls)

// Need to reconnect.
s = c.streamLeader(globalAccountName, "TEST")
nc, js = jsClientConnect(t, s)
defer nc.Close()

publishMessages()
checkMessagesAreCompressed()
}

// https://github.com/nats-io/nats-server/issues/5612
Expand Down
28 changes: 14 additions & 14 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) {

// Simulate leader storing an AppendEntry in WAL but being hard killed before it can propose to its peers.
n := rg.leader().node().(*raft)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}
n.Lock()
ae := n.buildAppendEntry(entries)
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) {
l.term = 20
l.Unlock()

esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}
l.Lock()
ae := l.buildAppendEntry(entries)
Expand Down Expand Up @@ -1102,7 +1102,7 @@ func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) {
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1183,7 +1183,7 @@ func TestNRGCatchupCanTruncateMultipleEntriesWithoutQuorum(t *testing.T) {
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1282,7 +1282,7 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1333,7 +1333,7 @@ func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) {
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1379,7 +1379,7 @@ func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) {
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1543,7 +1543,7 @@ func TestNRGCancelCatchupWhenDetectingHigherTermDuringVoteRequest(t *testing.T)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1590,7 +1590,7 @@ func TestNRGTruncateDownToCommitted(t *testing.T) {
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1651,7 +1651,7 @@ func TestNRGTruncateDownToCommittedWhenTruncateFails(t *testing.T) {
n.Unlock()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1716,7 +1716,7 @@ func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) {
defer c.shutdown()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1761,7 +1761,7 @@ func TestNRGHealthCheckWaitForCatchup(t *testing.T) {
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1820,7 +1820,7 @@ func TestNRGHealthCheckWaitForDoubleCatchup(t *testing.T) {
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down Expand Up @@ -1902,7 +1902,7 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) {
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true, false)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
Expand Down
27 changes: 13 additions & 14 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,20 +344,19 @@ type stream struct {

// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
sa *streamAssignment // What the meta controller uses to assign streams to peers.
node RaftNode // Our RAFT node for the stream's group.
catchup atomic.Bool // Used to signal we are in catchup mode.
catchups map[string]uint64 // The number of messages that need to be caught per peer.
syncSub *subscription // Internal subscription for sync messages (on "$JSC.SYNC").
infoSub *subscription // Internal subscription for stream info requests.
clMu sync.Mutex // The mutex for clseq and clfs.
clseq uint64 // The current last seq being proposed to the NRG layer.
clfs uint64 // The count (offset) of the number of failed NRG sequences used to compute clseq.
inflight map[uint64]uint64 // Inflight message sizes per clseq.
lqsent time.Time // The time at which the last lost quorum advisory was sent. Used to rate limit.
uch chan struct{} // The channel to signal updates to the monitor routine.
compressOK bool // True if we can do message compression in RAFT and catchup logic
inMonitor bool // True if the monitor routine has been started.
sa *streamAssignment // What the meta controller uses to assign streams to peers.
node RaftNode // Our RAFT node for the stream's group.
catchup atomic.Bool // Used to signal we are in catchup mode.
catchups map[string]uint64 // The number of messages that need to be caught per peer.
syncSub *subscription // Internal subscription for sync messages (on "$JSC.SYNC").
infoSub *subscription // Internal subscription for stream info requests.
clMu sync.Mutex // The mutex for clseq and clfs.
clseq uint64 // The current last seq being proposed to the NRG layer.
clfs uint64 // The count (offset) of the number of failed NRG sequences used to compute clseq.
inflight map[uint64]uint64 // Inflight message sizes per clseq.
lqsent time.Time // The time at which the last lost quorum advisory was sent. Used to rate limit.
uch chan struct{} // The channel to signal updates to the monitor routine.
inMonitor bool // True if the monitor routine has been started.

expectedPerSubjectReady bool // Initially blocks 'expected per subject' changes until leader is initially caught up with stored but not applied entries.
expectedPerSubjectSequence map[uint64]string // Inflight 'expected per subject' subjects per clseq.
Expand Down