Skip to content

Commit

Permalink
Fix truncate tail (#33)
Browse files Browse the repository at this point in the history
* Add ForceSeal to make tail truncations work correctly

* Support ForceSeal in segment.Writer

* Fix bad test assertion

* Fix assertion around sealed segent metadata to not be flaky

* Add integration tests for basic scenarios

* Also assert index start metadata is persisted
  • Loading branch information
banks authored Oct 20, 2023
1 parent 3aaaab8 commit ad34584
Show file tree
Hide file tree
Showing 8 changed files with 507 additions and 26 deletions.
268 changes: 268 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
package integration

import (
"bytes"
"fmt"
"os"
"strings"
"testing"

"github.com/hashicorp/raft"
wal "github.com/hashicorp/raft-wal"
"github.com/hashicorp/raft-wal/metadb"
"github.com/stretchr/testify/require"
)

type step func(w *wal.WAL) error

func TestIntegrationScenarios(t *testing.T) {
cases := []struct {
name string
steps []step
expectFirstIdx, expectLastIdx int
expectNumSegments int
}{
{
name: "basic creation, appends, rotation",
steps: []step{
// ~256 bytes plus overhead per log want to write more than 4K segment
// size. Batches of 4 are ~1k so 5 batches is enough to rotate once.
appendLogsInBatches(5, 4),
},
expectFirstIdx: 1,
expectLastIdx: 20,
expectNumSegments: 2,
},
{
name: "starting at high index, appends, rotation",
steps: []step{
appendFirstLogAt(1_000_000),
// ~256 bytes plus overhead per log want to write more than 4K segment
// size. Batches of 4 are ~1k so 5 batches is enough to rotate once.
appendLogsInBatches(5, 4),
},
expectFirstIdx: 1_000_000,
expectLastIdx: 1_000_020,
expectNumSegments: 2,
},
{
name: "head truncation deleting no files",
steps: []step{
appendLogsInBatches(11, 4),
deleteRange(1, 2),
},
expectFirstIdx: 3,
expectLastIdx: 44,
expectNumSegments: 3,
},
{
name: "head truncation deleting multiple files",
steps: []step{
appendLogsInBatches(11, 4),
deleteRange(1, 20),
},
expectFirstIdx: 21,
expectLastIdx: 44,
expectNumSegments: 2,
},
{
name: "tail truncation in active segment",
steps: []step{
appendLogsInBatches(11, 4),
deleteRange(44, 44), // Delete the last one log
},
expectFirstIdx: 1,
expectLastIdx: 43,
expectNumSegments: 4,
},
{
name: "tail truncation in active segment and write more",
steps: []step{
appendLogsInBatches(11, 4),
deleteRange(44, 44), // Delete the last one log
appendLogsInBatches(1, 4),
},
expectFirstIdx: 1,
expectLastIdx: 47,
expectNumSegments: 4,
},
{
name: "tail truncation deleting files",
steps: []step{
appendLogsInBatches(11, 4),
deleteRange(20, 44),
},
expectFirstIdx: 1,
expectLastIdx: 19,
// Only need 2 segments but the truncation will rotate to a new tail
expectNumSegments: 3,
},
{
name: "tail truncation deleting files and write more",
steps: []step{
appendLogsInBatches(11, 4),
deleteRange(20, 44),
appendLogsInBatches(1, 4),
},
expectFirstIdx: 1,
expectLastIdx: 23,
// Only need 2 segments but the truncation will rotate to a new tail
expectNumSegments: 3,
},
{
name: "write some logs, truncate everything, restart logs from different index",
steps: []step{
appendLogsInBatches(11, 4),
deleteRange(1, 44),
appendFirstLogAt(1000),
appendLogsInBatches(1, 4),
},
expectFirstIdx: 1000,
expectLastIdx: 1004,
expectNumSegments: 1,
},
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

tmpDir, err := os.MkdirTemp("", tc.name)
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

// Wrap the BoltDB meta store so we can peek into it's values.
meta := &PeekingMetaStore{
meta: &metadb.BoltMetaDB{},
}

w, err := wal.Open(tmpDir,
// 4k segments to test rotation quicker
wal.WithSegmentSize(4096),
wal.WithMetaStore(meta),
)
require.NoError(t, err)

// Execute initial operations
for i, step := range tc.steps {
require.NoError(t, step(w), "failed on step %d", i)
}

// Assert expected properties
assertLogContents(t, w, tc.expectFirstIdx, tc.expectLastIdx)
assertNumSegments(t, meta, tmpDir, tc.expectNumSegments)

// Close WAL and re-open
require.NoError(t, w.Close())

meta2 := &PeekingMetaStore{
meta: &metadb.BoltMetaDB{},
}

w2, err := wal.Open(tmpDir,
wal.WithSegmentSize(4096),
wal.WithMetaStore(meta2),
)
require.NoError(t, err)
defer w2.Close()

// Assert expected properties still hold
assertLogContents(t, w2, tc.expectFirstIdx, tc.expectLastIdx)
assertNumSegments(t, meta2, tmpDir, tc.expectNumSegments)
})
}
}

func appendLogsInBatches(nBatches, nPerBatch int) step {
return func(w *wal.WAL) error {
lastIdx, err := w.LastIndex()
if err != nil {
return err
}
nextIdx := lastIdx + 1

return appendLogsInBatchesStartingAt(w, nBatches, nPerBatch, int(nextIdx))
}
}

func appendFirstLogAt(index int) step {
return func(w *wal.WAL) error {
return appendLogsInBatchesStartingAt(w, 1, 1, index)
}
}

func appendLogsInBatchesStartingAt(w *wal.WAL, nBatches, nPerBatch, firstIndex int) error {
nextIdx := uint64(firstIndex)

batch := make([]*raft.Log, 0, nPerBatch)
for b := 0; b < nBatches; b++ {
for i := 0; i < nPerBatch; i++ {
log := raft.Log{
Index: nextIdx,
Data: makeValue(nextIdx),
}
batch = append(batch, &log)
nextIdx++
}
if err := w.StoreLogs(batch); err != nil {
return err
}
batch = batch[:0]
}
return nil
}

func makeValue(n uint64) []byte {
// Values are 16 repetitions of a 16 byte string based on the index so 256
// bytes total.
return bytes.Repeat([]byte(fmt.Sprintf("val-%011d\n", n)), 16)
}

func deleteRange(min, max int) step {
return func(w *wal.WAL) error {
return w.DeleteRange(uint64(min), uint64(max))
}
}

func assertLogContents(t *testing.T, w *wal.WAL, first, last int) {
t.Helper()

firstIdx, err := w.FirstIndex()
require.NoError(t, err)
lastIdx, err := w.LastIndex()
require.NoError(t, err)

require.Equal(t, first, int(firstIdx))
require.Equal(t, last, int(lastIdx))

var log raft.Log
for i := first; i <= last; i++ {
err := w.GetLog(uint64(i), &log)
require.NoError(t, err, "log index %d", i)
require.Equal(t, i, int(log.Index), "log index %d", i)
require.Equal(t, string(makeValue(log.Index)), string(log.Data), "log index %d", i)
}
}

func assertNumSegments(t *testing.T, meta *PeekingMetaStore, dir string, numSegments int) {
t.Helper()

state := meta.PeekState()
require.Equal(t, numSegments, len(state.Segments))

// Check the right number of segment files on disk too
des, err := os.ReadDir(dir)
require.NoError(t, err)

segFiles := make([]string, 0, numSegments)
for _, de := range des {
if de.IsDir() {
continue
}
if strings.HasSuffix(de.Name(), ".wal") {
segFiles = append(segFiles, de.Name())
}
}
require.Equal(t, numSegments, len(segFiles), "expected two segment files, got %v", segFiles)
}
65 changes: 65 additions & 0 deletions integration/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package integration

import (
"sync"

"github.com/hashicorp/raft-wal/types"
)

type PeekingMetaStore struct {
mu sync.Mutex
meta types.MetaStore
state types.PersistentState
stable map[string]string
}

func (s *PeekingMetaStore) PeekState() types.PersistentState {
s.mu.Lock()
defer s.mu.Unlock()
return s.state
}

func (s *PeekingMetaStore) PeekStable(key string) (string, bool) {
s.mu.Lock()
defer s.mu.Unlock()
v, ok := s.stable[key]
return v, ok
}

func (s *PeekingMetaStore) Load(dir string) (types.PersistentState, error) {
state, err := s.meta.Load(dir)
if err == nil {
s.mu.Lock()
s.state = state
s.mu.Unlock()
}
return state, err
}

func (s *PeekingMetaStore) CommitState(state types.PersistentState) error {
err := s.meta.CommitState(state)
if err == nil {
s.mu.Lock()
s.state = state
s.mu.Unlock()
}
return nil
}

func (s *PeekingMetaStore) GetStable(key []byte) ([]byte, error) {
return s.meta.GetStable(key)
}

func (s *PeekingMetaStore) SetStable(key, value []byte) error {
err := s.meta.SetStable(key, value)
if err == nil {
s.mu.Lock()
s.stable[string(key)] = string(value)
s.mu.Unlock()
}
return err
}

func (s *PeekingMetaStore) Close() error {
return s.meta.Close()
}
22 changes: 22 additions & 0 deletions segment/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,28 @@ func (w *Writer) Sealed() (bool, uint64, error) {
return true, w.writer.indexStart, nil
}

// ForceSeal forces us to seal the segment by writing out an index block
// wherever we got to in the file. After calling this it is no longer valid to
// call Append on this file.
func (w *Writer) ForceSeal() (uint64, error) {
if w.writer.indexStart > 0 {
// Already sealed, this is a no-op.
return w.writer.indexStart, nil
}

// Seal the segment! We seal it by writing an index frame before we commit.
if err := w.appendIndex(); err != nil {
return 0, err
}

// Write the commit frame
if err := w.appendCommit(); err != nil {
return 0, err
}

return w.writer.indexStart, nil
}

// LastIndex returns the most recently persisted index in the log. It must
// respond without blocking on append since it's needed frequently by read
// paths that may call it concurrently. Typically this will be loaded from an
Expand Down
Loading

0 comments on commit ad34584

Please sign in to comment.