diff --git a/integration/integration_test.go b/integration/integration_test.go new file mode 100644 index 0000000..a8ed81e --- /dev/null +++ b/integration/integration_test.go @@ -0,0 +1,268 @@ +package integration + +import ( + "bytes" + "fmt" + "os" + "strings" + "testing" + + "github.com/hashicorp/raft" + wal "github.com/hashicorp/raft-wal" + "github.com/hashicorp/raft-wal/metadb" + "github.com/stretchr/testify/require" +) + +type step func(w *wal.WAL) error + +func TestIntegrationScenarios(t *testing.T) { + cases := []struct { + name string + steps []step + expectFirstIdx, expectLastIdx int + expectNumSegments int + }{ + { + name: "basic creation, appends, rotation", + steps: []step{ + // ~256 bytes plus overhead per log want to write more than 4K segment + // size. Batches of 4 are ~1k so 5 batches is enough to rotate once. + appendLogsInBatches(5, 4), + }, + expectFirstIdx: 1, + expectLastIdx: 20, + expectNumSegments: 2, + }, + { + name: "starting at high index, appends, rotation", + steps: []step{ + appendFirstLogAt(1_000_000), + // ~256 bytes plus overhead per log want to write more than 4K segment + // size. Batches of 4 are ~1k so 5 batches is enough to rotate once. + appendLogsInBatches(5, 4), + }, + expectFirstIdx: 1_000_000, + expectLastIdx: 1_000_020, + expectNumSegments: 2, + }, + { + name: "head truncation deleting no files", + steps: []step{ + appendLogsInBatches(11, 4), + deleteRange(1, 2), + }, + expectFirstIdx: 3, + expectLastIdx: 44, + expectNumSegments: 3, + }, + { + name: "head truncation deleting multiple files", + steps: []step{ + appendLogsInBatches(11, 4), + deleteRange(1, 20), + }, + expectFirstIdx: 21, + expectLastIdx: 44, + expectNumSegments: 2, + }, + { + name: "tail truncation in active segment", + steps: []step{ + appendLogsInBatches(11, 4), + deleteRange(44, 44), // Delete the last one log + }, + expectFirstIdx: 1, + expectLastIdx: 43, + expectNumSegments: 4, + }, + { + name: "tail truncation in active segment and write more", + steps: []step{ + appendLogsInBatches(11, 4), + deleteRange(44, 44), // Delete the last one log + appendLogsInBatches(1, 4), + }, + expectFirstIdx: 1, + expectLastIdx: 47, + expectNumSegments: 4, + }, + { + name: "tail truncation deleting files", + steps: []step{ + appendLogsInBatches(11, 4), + deleteRange(20, 44), + }, + expectFirstIdx: 1, + expectLastIdx: 19, + // Only need 2 segments but the truncation will rotate to a new tail + expectNumSegments: 3, + }, + { + name: "tail truncation deleting files and write more", + steps: []step{ + appendLogsInBatches(11, 4), + deleteRange(20, 44), + appendLogsInBatches(1, 4), + }, + expectFirstIdx: 1, + expectLastIdx: 23, + // Only need 2 segments but the truncation will rotate to a new tail + expectNumSegments: 3, + }, + { + name: "write some logs, truncate everything, restart logs from different index", + steps: []step{ + appendLogsInBatches(11, 4), + deleteRange(1, 44), + appendFirstLogAt(1000), + appendLogsInBatches(1, 4), + }, + expectFirstIdx: 1000, + expectLastIdx: 1004, + expectNumSegments: 1, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + tmpDir, err := os.MkdirTemp("", tc.name) + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + // Wrap the BoltDB meta store so we can peek into it's values. + meta := &PeekingMetaStore{ + meta: &metadb.BoltMetaDB{}, + } + + w, err := wal.Open(tmpDir, + // 4k segments to test rotation quicker + wal.WithSegmentSize(4096), + wal.WithMetaStore(meta), + ) + require.NoError(t, err) + + // Execute initial operations + for i, step := range tc.steps { + require.NoError(t, step(w), "failed on step %d", i) + } + + // Assert expected properties + assertLogContents(t, w, tc.expectFirstIdx, tc.expectLastIdx) + assertNumSegments(t, meta, tmpDir, tc.expectNumSegments) + + // Close WAL and re-open + require.NoError(t, w.Close()) + + meta2 := &PeekingMetaStore{ + meta: &metadb.BoltMetaDB{}, + } + + w2, err := wal.Open(tmpDir, + wal.WithSegmentSize(4096), + wal.WithMetaStore(meta2), + ) + require.NoError(t, err) + defer w2.Close() + + // Assert expected properties still hold + assertLogContents(t, w2, tc.expectFirstIdx, tc.expectLastIdx) + assertNumSegments(t, meta2, tmpDir, tc.expectNumSegments) + }) + } +} + +func appendLogsInBatches(nBatches, nPerBatch int) step { + return func(w *wal.WAL) error { + lastIdx, err := w.LastIndex() + if err != nil { + return err + } + nextIdx := lastIdx + 1 + + return appendLogsInBatchesStartingAt(w, nBatches, nPerBatch, int(nextIdx)) + } +} + +func appendFirstLogAt(index int) step { + return func(w *wal.WAL) error { + return appendLogsInBatchesStartingAt(w, 1, 1, index) + } +} + +func appendLogsInBatchesStartingAt(w *wal.WAL, nBatches, nPerBatch, firstIndex int) error { + nextIdx := uint64(firstIndex) + + batch := make([]*raft.Log, 0, nPerBatch) + for b := 0; b < nBatches; b++ { + for i := 0; i < nPerBatch; i++ { + log := raft.Log{ + Index: nextIdx, + Data: makeValue(nextIdx), + } + batch = append(batch, &log) + nextIdx++ + } + if err := w.StoreLogs(batch); err != nil { + return err + } + batch = batch[:0] + } + return nil +} + +func makeValue(n uint64) []byte { + // Values are 16 repetitions of a 16 byte string based on the index so 256 + // bytes total. + return bytes.Repeat([]byte(fmt.Sprintf("val-%011d\n", n)), 16) +} + +func deleteRange(min, max int) step { + return func(w *wal.WAL) error { + return w.DeleteRange(uint64(min), uint64(max)) + } +} + +func assertLogContents(t *testing.T, w *wal.WAL, first, last int) { + t.Helper() + + firstIdx, err := w.FirstIndex() + require.NoError(t, err) + lastIdx, err := w.LastIndex() + require.NoError(t, err) + + require.Equal(t, first, int(firstIdx)) + require.Equal(t, last, int(lastIdx)) + + var log raft.Log + for i := first; i <= last; i++ { + err := w.GetLog(uint64(i), &log) + require.NoError(t, err, "log index %d", i) + require.Equal(t, i, int(log.Index), "log index %d", i) + require.Equal(t, string(makeValue(log.Index)), string(log.Data), "log index %d", i) + } +} + +func assertNumSegments(t *testing.T, meta *PeekingMetaStore, dir string, numSegments int) { + t.Helper() + + state := meta.PeekState() + require.Equal(t, numSegments, len(state.Segments)) + + // Check the right number of segment files on disk too + des, err := os.ReadDir(dir) + require.NoError(t, err) + + segFiles := make([]string, 0, numSegments) + for _, de := range des { + if de.IsDir() { + continue + } + if strings.HasSuffix(de.Name(), ".wal") { + segFiles = append(segFiles, de.Name()) + } + } + require.Equal(t, numSegments, len(segFiles), "expected two segment files, got %v", segFiles) +} diff --git a/integration/meta.go b/integration/meta.go new file mode 100644 index 0000000..ba10369 --- /dev/null +++ b/integration/meta.go @@ -0,0 +1,65 @@ +package integration + +import ( + "sync" + + "github.com/hashicorp/raft-wal/types" +) + +type PeekingMetaStore struct { + mu sync.Mutex + meta types.MetaStore + state types.PersistentState + stable map[string]string +} + +func (s *PeekingMetaStore) PeekState() types.PersistentState { + s.mu.Lock() + defer s.mu.Unlock() + return s.state +} + +func (s *PeekingMetaStore) PeekStable(key string) (string, bool) { + s.mu.Lock() + defer s.mu.Unlock() + v, ok := s.stable[key] + return v, ok +} + +func (s *PeekingMetaStore) Load(dir string) (types.PersistentState, error) { + state, err := s.meta.Load(dir) + if err == nil { + s.mu.Lock() + s.state = state + s.mu.Unlock() + } + return state, err +} + +func (s *PeekingMetaStore) CommitState(state types.PersistentState) error { + err := s.meta.CommitState(state) + if err == nil { + s.mu.Lock() + s.state = state + s.mu.Unlock() + } + return nil +} + +func (s *PeekingMetaStore) GetStable(key []byte) ([]byte, error) { + return s.meta.GetStable(key) +} + +func (s *PeekingMetaStore) SetStable(key, value []byte) error { + err := s.meta.SetStable(key, value) + if err == nil { + s.mu.Lock() + s.stable[string(key)] = string(value) + s.mu.Unlock() + } + return err +} + +func (s *PeekingMetaStore) Close() error { + return s.meta.Close() +} diff --git a/segment/writer.go b/segment/writer.go index 1f0a2b2..3e67cc6 100644 --- a/segment/writer.go +++ b/segment/writer.go @@ -488,6 +488,28 @@ func (w *Writer) Sealed() (bool, uint64, error) { return true, w.writer.indexStart, nil } +// ForceSeal forces us to seal the segment by writing out an index block +// wherever we got to in the file. After calling this it is no longer valid to +// call Append on this file. +func (w *Writer) ForceSeal() (uint64, error) { + if w.writer.indexStart > 0 { + // Already sealed, this is a no-op. + return w.writer.indexStart, nil + } + + // Seal the segment! We seal it by writing an index frame before we commit. + if err := w.appendIndex(); err != nil { + return 0, err + } + + // Write the commit frame + if err := w.appendCommit(); err != nil { + return 0, err + } + + return w.writer.indexStart, nil +} + // LastIndex returns the most recently persisted index in the log. It must // respond without blocking on append since it's needed frequently by read // paths that may call it concurrently. Typically this will be loaded from an diff --git a/segment/writer_test.go b/segment/writer_test.go index e49824b..34962b1 100644 --- a/segment/writer_test.go +++ b/segment/writer_test.go @@ -224,10 +224,55 @@ func assertExpectedLogs(t *testing.T, w types.SegmentWriter, first, last int) { if last == 0 { return } + assertExpectedReaderLogs(t, w, first, last) +} + +func assertExpectedReaderLogs(t *testing.T, r types.SegmentReader, first, last int) { + t.Helper() + for idx := first; idx <= last; idx++ { - buf, err := w.GetLog(uint64(idx)) + buf, err := r.GetLog(uint64(idx)) require.NoError(t, err) require.Equal(t, fmt.Sprintf("val-%d", idx), string(buf.Bs)) buf.Close() } } + +func TestWriterForceSeal(t *testing.T) { + vfs := newTestVFS() + + f := NewFiler("test", vfs) + + seg0 := testSegment(1) + + w, err := f.Create(seg0) + require.NoError(t, err) + defer w.Close() + + batch := make([]types.LogEntry, 5) + for i := range batch { + batch[i].Index = uint64(i + 1) + batch[i].Data = []byte(fmt.Sprintf("val-%d", i+1)) + } + require.NoError(t, w.Append(batch)) + + assertExpectedLogs(t, w, 1, 5) + + // Should not have sealed after one append. + sealed, indexStart, err := w.Sealed() + require.NoError(t, err) + require.False(t, sealed) + require.Equal(t, 0, int(indexStart)) + + // Force seal it + indexStart, err = w.ForceSeal() + require.NoError(t, err) + require.Greater(t, int(indexStart), 0) + + // It should be possible to open it with a reader now + seg0.IndexStart = indexStart + r, err := f.Open(seg0) + require.NoError(t, err) + + assertExpectedReaderLogs(t, r, 1, 5) +} diff --git a/types/segment.go b/types/segment.go index 7e63180..b1ed55e 100644 --- a/types/segment.go +++ b/types/segment.go @@ -104,7 +104,8 @@ type SegmentWriter interface { // Append adds one or more entries. It must not return until the entries are // durably stored otherwise raft's guarantees will be compromised. Append must - // not be called concurrently with any other call to Sealed or Append. + // not be called concurrently with any other call to Sealed, Append or + // ForceSeal. Append(entries []LogEntry) error // Sealed returns whether the segment is sealed or not. If it is it returns @@ -112,10 +113,23 @@ type SegmentWriter interface { // meta data. WAL will call this after every append so it should be relatively // cheap in the common case. This design allows the final Append to write out // the index or any additional data needed at seal time in the same fsync. - // Sealed must not be called concurrently with any other call to Sealed or - // Append. + // Sealed must not be called concurrently with any other call to Sealed, + // Append or ForceSeal. Sealed() (bool, uint64, error) + // ForceSeal causes the segment to become sealed by writing out an index + // block. This is not used in the typical flow of append and rotation, but is + // necessary during truncations where some suffix of the writer needs to be + // truncated. Rather than manipulate what is on disk in a complex way, the WAL + // will simply force seal it with whatever state it has already saved and then + // open a new segment at the right offset for continued writing. ForceSeal may + // be called on a segment that has already been sealed and should just return + // the existing index offset in that case. (We don't actually rely on that + // currently but it's easier not to assume we'll always call it at most once). + // ForceSeal must not be called concurrently with any other call to Sealed, + // Append or ForceSeal. + ForceSeal() (uint64, error) + // LastIndex returns the most recently persisted index in the log. It must // respond without blocking on Append since it's needed frequently by read // paths that may call it concurrently. Typically this will be loaded from an diff --git a/wal.go b/wal.go index 7ead441..b81550b 100644 --- a/wal.go +++ b/wal.go @@ -34,9 +34,11 @@ var ( DefaultSegmentSize = 64 * 1024 * 1024 ) -var _ raft.LogStore = &WAL{} -var _ raft.MonotonicLogStore = &WAL{} -var _ raft.StableStore = &WAL{} +var ( + _ raft.LogStore = &WAL{} + _ raft.MonotonicLogStore = &WAL{} + _ raft.StableStore = &WAL{} +) // WAL is a write-ahead log suitable for github.com/hashicorp/raft. type WAL struct { @@ -364,14 +366,9 @@ func (w *WAL) StoreLogs(logs []*raft.Log) error { w.writeMu.Lock() defer w.writeMu.Unlock() - awaitCh := w.awaitRotate - if awaitCh != nil { - // We managed to race for writeMu with the background rotate operation which - // needs to complete first. Wait for it to complete. - w.writeMu.Unlock() - <-awaitCh - w.writeMu.Lock() - } + // Ensure queued rotation has completed before us if we raced with it for + // write lock. + w.awaitRotationLocked() s, release := w.acquireState() defer release() @@ -450,6 +447,17 @@ func (w *WAL) StoreLogs(logs []*raft.Log) error { return nil } +func (w *WAL) awaitRotationLocked() { + awaitCh := w.awaitRotate + if awaitCh != nil { + // We managed to race for writeMu with the background rotate operation which + // needs to complete first. Wait for it to complete. + w.writeMu.Unlock() + <-awaitCh + w.writeMu.Lock() + } +} + // DeleteRange deletes a range of log entries. The range is inclusive. // Implements raft.LogStore. Note that we only support deleting ranges that are // a suffix or prefix of the log. @@ -465,6 +473,10 @@ func (w *WAL) DeleteRange(min uint64, max uint64) error { w.writeMu.Lock() defer w.writeMu.Unlock() + // Ensure queued rotation has completed before us if we raced with it for + // write lock. + w.awaitRotationLocked() + s, release := w.acquireState() defer release() @@ -812,7 +824,7 @@ func (w *WAL) truncateTailLocked(newMax uint64) error { toDelete[seg.ID] = seg.BaseIndex toClose = append(toClose, seg.r) newState.segments = newState.segments.Delete(seg.BaseIndex) - nTruncated += (maxIdx - seg.MinIndex + 1) // +1 becuase MaxIndex is inclusive + nTruncated += (maxIdx - seg.MinIndex + 1) // +1 because MaxIndex is inclusive } tail := newState.getTailInfo() @@ -822,11 +834,17 @@ func (w *WAL) truncateTailLocked(newMax uint64) error { // Check that the tail is sealed (it won't be if we didn't need to remove // the actual partial tail above). if tail.SealTime.IsZero() { + // Actually seal it (i.e. force it to write out an index block wherever + // it got to). + indexStart, err := newState.tail.ForceSeal() + if err != nil { + return nil, nil, err + } + tail.IndexStart = indexStart tail.SealTime = time.Now() maxIdx = newState.lastIndex() } // Update the MaxIndex - nTruncated += (maxIdx - newMax) tail.MaxIndex = newMax diff --git a/wal_stubs_test.go b/wal_stubs_test.go index 45e0ba4..58441d1 100644 --- a/wal_stubs_test.go +++ b/wal_stubs_test.go @@ -26,6 +26,12 @@ func testOpenWAL(t *testing.T, tsOpts []testStorageOpt, walOpts []walOpt, allowI t.Helper() ts := makeTestStorage(tsOpts...) + w, err := testOpenWALWithStorage(t, ts, walOpts, allowInvalidMeta) + return ts, w, err +} + +func testOpenWALWithStorage(t *testing.T, ts *testStorage, walOpts []walOpt, allowInvalidMeta bool) (*WAL, error) { + t.Helper() // Make sure "persisted" state is setup in a valid way sort.Slice(ts.metaState.Segments, func(i, j int) bool { @@ -41,7 +47,7 @@ func testOpenWAL(t *testing.T, tsOpts []testStorageOpt, walOpts []walOpt, allowI opts := append(walOpts, stubStorage(ts)) w, err := Open("test", opts...) - return ts, w, err + return w, err } type testStorageOpt func(ts *testStorage) @@ -64,6 +70,7 @@ func segFull() testStorageOpt { // Seal "full" segments seg.mutate(func(newState *testSegmentState) error { newState.info.SealTime = time.Now() + newState.info.IndexStart = 12345 newState.info.MaxIndex = newState.info.BaseIndex + uint64(len(es)) - 1 return nil }) @@ -283,7 +290,7 @@ func (ts *testStorage) assertValidMetaState(t *testing.T) { isTail := (i == n-1) if isTail && !seg.SealTime.IsZero() { - t.Fatalf("final segment in committed state is not sealed") + t.Fatalf("final segment in committed state is sealed") } if !isTail && seg.SealTime.IsZero() { t.Fatalf("unsealed segment not at tail in committed state") @@ -302,6 +309,21 @@ func (ts *testStorage) assertValidMetaState(t *testing.T) { require.True(t, ok) require.Equal(t, seg.BaseIndex, log.Index) } + + // Verify that if it's meant to be sealed in metadata that it actually is + // and has an index block. Note that we don't test that the actual segment + // sealed status matches meta since that is not always true e.g. just + // after an append that caused segment to seal but before the rotate has + // updated metadata. The thing we need to ensure is that if metadata says + // it's sealed that the actual segment is actually sealed and has an index + // block. + if !seg.SealTime.IsZero() { + sealed, indexStart, err := ts.Sealed() + require.NoError(t, err) + require.True(t, sealed) + require.NotEqual(t, 0, int(indexStart)) + require.Equal(t, indexStart, seg.IndexStart) + } } } } @@ -330,7 +352,7 @@ func (ts *testStorage) CommitState(ps types.PersistentState) error { ts.metaState = ps // For the sake of not being super confusing, lets also update all the - //types.SegmentInfos in the testSegments e.g. if Min/Max were set due to a + // types.SegmentInfos in the testSegments e.g. if Min/Max were set due to a // truncation or the segment was sealed. for _, seg := range ps.Segments { ts, ok := ts.segments[seg.ID] @@ -483,7 +505,6 @@ func (ts *testStorage) assertAllClosed(t *testing.T, wantClosed bool) { require.True(t, closed, "segment with BaseIndex=%d was not closed", s.info().BaseIndex) } else { require.False(t, closed, "segment with BaseIndex=%d was closed", s.info().BaseIndex) - } } } @@ -499,9 +520,10 @@ type testSegment struct { } type testSegmentState struct { - info types.SegmentInfo - logs *immutable.SortedMap[uint64, types.LogEntry] - closed bool + info types.SegmentInfo + logs *immutable.SortedMap[uint64, types.LogEntry] + closed bool + indexStart uint64 } func (s *testSegment) loadState() testSegmentState { @@ -566,6 +588,10 @@ func (s *testSegment) Append(entries []types.LogEntry) error { } newState.logs = newState.logs.Set(e.Index, e) } + // Maybe seal + if newState.logs.Len() >= s.limit { + newState.indexStart = 12345 + } return nil }) } @@ -575,7 +601,21 @@ func (s *testSegment) Sealed() (bool, uint64, error) { if state.closed { panic("sealed on closed segment") } - return state.logs.Len() >= s.limit, 12345, nil + return state.indexStart > 0, state.indexStart, nil +} + +func (s *testSegment) ForceSeal() (uint64, error) { + err := s.mutate(func(newState *testSegmentState) error { + if newState.closed { + return errors.New("closed") + } + newState.indexStart = 12345 + return nil + }) + if err != nil { + return 0, err + } + return 12345, nil } func (s *testSegment) LastIndex() uint64 { diff --git a/wal_test.go b/wal_test.go index 6305b45..4dde8b3 100644 --- a/wal_test.go +++ b/wal_test.go @@ -198,6 +198,8 @@ func TestWALOpen(t *testing.T) { for _, tc := range cases { tc := tc t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ts, w, err := testOpenWAL(t, tc.tsOpts, tc.walOpts, tc.ignoreInvalidMeta) // Error or not we should never commit an invalid set of segments to meta. @@ -371,6 +373,8 @@ func TestStoreLogs(t *testing.T) { for _, tc := range cases { tc := tc t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ts, w, err := testOpenWAL(t, tc.tsOpts, tc.walOpts, false) require.NoError(t, err) @@ -397,7 +401,7 @@ func TestStoreLogs(t *testing.T) { require.NoError(t, err) require.Equal(t, int(tc.expectLastIndex), int(last)) - // Check all the internal meta/segment state meets our invariants + // Check all the internal meta/segment state meets our invariants. ts.assertValidMetaState(t) // Check all log entries exist that are meant to @@ -627,6 +631,8 @@ func TestDeleteRange(t *testing.T) { for _, tc := range cases { tc := tc t.Run(tc.name, func(t *testing.T) { + t.Parallel() + opts := tc.walOpts // add our own metrics counter @@ -692,6 +698,9 @@ func TestDeleteRange(t *testing.T) { require.Equal(t, int(nextIdx), int(log.Index)) validateLogEntry(t, &log) + // Verify all segment state is consistent with metadata + ts.assertValidMetaState(t) + // Verify the metrics recorded what we expected! metrics := m.Summary() require.Equal(t, int(tc.expectNTailTruncations), int(metrics.Counters["tail_truncations"]))