diff --git a/go.mod b/go.mod index 13bcac8..6723208 100644 --- a/go.mod +++ b/go.mod @@ -4,19 +4,18 @@ go 1.18 require ( github.com/HdrHistogram/hdrhistogram-go v1.1.0 - github.com/armon/go-metrics v0.3.10 + github.com/armon/go-metrics v0.4.1 github.com/benbjohnson/immutable v0.4.0 github.com/benmathews/bench v0.0.0-20210120214102-f7c75b9ef6e7 github.com/benmathews/hdrhistogram-writer v0.0.0-20210120211942-3cb1c7c33f95 github.com/coreos/etcd v3.3.27+incompatible github.com/google/gofuzz v1.2.0 github.com/hashicorp/go-hclog v0.14.1 - github.com/hashicorp/raft v1.3.10 + github.com/hashicorp/raft v1.4.0 github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0 github.com/hashicorp/raft-boltdb/v2 v2.2.2 - github.com/ryboe/q v1.0.18 github.com/segmentio/fasthash v1.0.3 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.8.2 go.etcd.io/bbolt v1.3.6 ) @@ -29,12 +28,9 @@ require ( github.com/hashicorp/go-immutable-radix v1.3.0 // indirect github.com/hashicorp/go-msgpack v1.1.5 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect - github.com/kr/pretty v0.3.1 // indirect - github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.6 // indirect github.com/mattn/go-isatty v0.0.12 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rogpeppe/go-internal v1.9.0 // indirect golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect golang.org/x/time v0.1.0 // indirect diff --git a/go.sum b/go.sum index 3e091bf..1a37ff3 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= -github.com/armon/go-metrics v0.3.10 h1:FR+drcQStOe+32sYyJYyZ7FIdgoGGBnwLl+flodp8Uo= -github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= +github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= +github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/benbjohnson/immutable v0.4.0 h1:CTqXbEerYso8YzVPxmWxh2gnoRQbbB9X1quUC8+vGZA= github.com/benbjohnson/immutable v0.4.0/go.mod h1:iAr8OjJGLnLmVUr9MZ/rz4PWUy6Ouc2JLYuMArmvAJM= github.com/benmathews/bench v0.0.0-20210120214102-f7c75b9ef6e7 h1:nYTgFk9sOL3rmNew6rR2anUWWCzmSYPMJiSmowV8Yls= @@ -91,7 +91,6 @@ github.com/hashicorp/go-hclog v0.14.1/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39 github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-immutable-radix v1.3.0 h1:8exGP7ego3OmkfksihtSouGMZ+hQrhxx+FVELeXpVPE= github.com/hashicorp/go-immutable-radix v1.3.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= -github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-msgpack v1.1.5 h1:9byZdVjKTe5mce63pRVNP1L7UAmdHOTEMGehn6KvJWs= github.com/hashicorp/go-msgpack v1.1.5/go.mod h1:gWVc3sv/wbDmR3rQsj1CAktEZzoz1YNK9NfGLXJ69/4= @@ -104,8 +103,8 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= -github.com/hashicorp/raft v1.3.10 h1:LR5QZX1VQd0DFWZfeCwWawyeKfpS/Tm1yjnJIY5X4Tw= -github.com/hashicorp/raft v1.3.10/go.mod h1:J8naEwc6XaaCfts7+28whSeRvCqTd6e20BlCU3LtEO4= +github.com/hashicorp/raft v1.4.0 h1:tn28S/AWv0BtRQgwZv/1NELu8sCvI0FixqL8C8MYKeY= +github.com/hashicorp/raft v1.4.0/go.mod h1:nz64BIjXphDLATfKGG5RzHtNUPioLeKFsXEm88yTVew= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hashicorp/raft-boltdb v0.0.0-20210409134258-03c10cc3d4ea/go.mod h1:qRd6nFJYYS6Iqnc/8HcUmko2/2Gw8qTFEmxDLii6W5I= github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0 h1:CO8dBMLH6dvE1jTn/30ZZw3iuPsNfajshWoJTnVc5cc= @@ -124,8 +123,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -160,7 +157,6 @@ github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQT github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -181,10 +177,6 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/ryboe/q v1.0.18 h1:uTonPt1eZjy7GSpB0XpYpsCvX+Yf9f+M4CUKuH2r+vg= -github.com/ryboe/q v1.0.18/go.mod h1:elqvVf/GBuZHvZ9gvHv4MKM6NZAMz2rFajnTgQZ46wU= github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -193,14 +185,16 @@ github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71/go.mod h1:AZpEONHx3 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= diff --git a/segment/filer_test.go b/segment/filer_test.go index 23dd38e..d110fc8 100644 --- a/segment/filer_test.go +++ b/segment/filer_test.go @@ -37,6 +37,9 @@ func TestSegmentBasics(t *testing.T) { // being created and written to). require.False(t, w.(*Writer).wf.(*testWritableFile).dirty) + // Try to write a log that is not the base index + err = w.Append([]types.LogEntry{{Index: 2, Data: []byte("two")}}) + require.ErrorContains(t, err, "non-monotonic append to segment with BaseIndex=1. Entry index 2, expected 1") // Append to writer err = w.Append([]types.LogEntry{{Index: 1, Data: []byte("one")}}) require.NoError(t, err) @@ -57,6 +60,10 @@ func TestSegmentBasics(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte("one"), got.Bs) + // Try to write a log that is not the expected next index (which would be 2) + err = w.Append([]types.LogEntry{{Index: 10, Data: []byte("ten")}}) + require.ErrorContains(t, err, "non-monotonic append to segment with BaseIndex=1. Entry index 10, expected 2") + expectVals := append([]string{}, "one") // OK, now write some more. batch := make([]types.LogEntry, 0, 10) diff --git a/segment/writer.go b/segment/writer.go index 0f36d0e..c5aa809 100644 --- a/segment/writer.go +++ b/segment/writer.go @@ -363,11 +363,40 @@ func (w *Writer) OffsetForFrame(idx uint64) (uint32, error) { } func (w *Writer) appendEntry(e types.LogEntry) error { + offsets := w.getOffsets() + + // Check the invariant that this entry is the next one we expect otherwise our + // index logic is incorrect and will result in panics on read. + if e.Index != w.info.BaseIndex+uint64(len(offsets)) { + return fmt.Errorf("non-monotonic append to segment with BaseIndex=%d. Entry index %d, expected %d", + w.info.BaseIndex, e.Index, w.info.BaseIndex+uint64(len(offsets))) + } + fh := frameHeader{ typ: FrameEntry, len: uint32(len(e.Data)), } - return w.appendFrame(fh, e.Data) + bufOffset, err := w.appendFrame(fh, e.Data) + if err != nil { + return err + } + // Update the offsets index + + // Add the index entry. Note this is safe despite mutating the same backing + // array as tail because it's beyond the limit current readers will access + // until we do the atomic update below. Even if append re-allocates the + // backing array, it will only read the indexes smaller than numEntries from + // the old array to copy them into the new one and we are not mutating the + // same memory locations. Old readers might still be looking at the old + // array (lower than numEntries) through the current tail.offsets slice but + // we are not touching that at least below numEntries. + offsets = append(offsets, w.writer.writeOffset+uint32(bufOffset)) + + // Now we can make it available to readers. Note that readers still + // shouldn't read it until we actually commit to disk (and increment + // commitIdx) but it's race free for them to now! + w.offsets.Store(offsets) + return nil } func (w *Writer) appendCommit() error { @@ -375,7 +404,7 @@ func (w *Writer) appendCommit() error { typ: FrameCommit, crc: w.writer.crc, } - if err := w.appendFrame(fh, nil); err != nil { + if _, err := w.appendFrame(fh, nil); err != nil { return err } @@ -430,41 +459,21 @@ func (w *Writer) appendIndex() error { // appendFrame appends the given frame to the current block. The frame must fit // already otherwise an error will be returned. -func (w *Writer) appendFrame(fh frameHeader, data []byte) error { +func (w *Writer) appendFrame(fh frameHeader, data []byte) (int, error) { // Encode frame header into current block buffer l := encodedFrameSize(len(data)) w.ensureBufCap(l) bufOffset := len(w.writer.commitBuf) if err := writeFrame(w.writer.commitBuf[bufOffset:bufOffset+l], fh, data); err != nil { - return err + return 0, err } // Update len of commitBuf since we resliced it for the write w.writer.commitBuf = w.writer.commitBuf[:bufOffset+l] // Update the CRC w.writer.crc = crc32.Update(w.writer.crc, castagnoliTable, w.writer.commitBuf[bufOffset:bufOffset+l]) - - // If frame is an entry, update the index - if fh.typ == FrameEntry { - offsets := w.getOffsets() - - // Add the index entry. Note this is safe despite mutating the same backing - // array as tail because it's beyond the limit current readers will access - // until we do the atomic update below. Even if append re-allocates the - // backing array, it will only read the indexes smaller than numEntries from - // the old array to copy them into the new one and we are not mutating the - // same memory locations. Old readers might still be looking at the old - // array (lower than numEntries) through the current tail.offsets slice but - // we are not touching that at least below numEntries. - offsets = append(offsets, w.writer.writeOffset+uint32(bufOffset)) - - // Now we can make it available to readers. Note that readers still - // shouldn't read it until we actually commit to disk (and increment - // commitIdx) but it's race free for them to now! - w.offsets.Store(offsets) - } - return nil + return bufOffset, nil } func (w *Writer) flush() error { diff --git a/state.go b/state.go index 71ae5e1..7180378 100644 --- a/state.go +++ b/state.go @@ -28,6 +28,10 @@ type state struct { finalizer atomic.Value // func() nextSegmentID uint64 + + // nextBaseIndex is used to signal which baseIndex to use next if there are no + // segments or current tail. + nextBaseIndex uint64 segments *immutable.SortedMap[uint64, segmentState] tail types.SegmentWriter } @@ -151,8 +155,23 @@ func (s *state) lastIndex() uint64 { if tailIdx > 0 { return tailIdx } - // Current tail is empty, so the largest log is the MaxIndex of the previous - // segment which must be the same as the tail's BaseIndex - 1 or zero. + // Current tail is empty. Check there are previous sealed segments. + it := s.segments.Iterator() + it.Last() + _, _, ok := it.Prev() + if !ok { + // No tail! shouldn't be possible but means no logs yet + return 0 + } + // Go back to the segment before the tail + _, _, ok = it.Prev() + if !ok { + // No previous segment so the whole log is empty + return 0 + } + + // There was a previous segment so it's MaxIndex will be one less than the + // tail's BaseIndex. tailSeg := s.getTailInfo() if tailSeg == nil || tailSeg.BaseIndex == 0 { return 0 diff --git a/types/types.go b/types/types.go index 637e5e6..4eb8513 100644 --- a/types/types.go +++ b/types/types.go @@ -3,10 +3,18 @@ package types -import "errors" +import ( + "errors" + + "github.com/hashicorp/raft" +) var ( - ErrNotFound = errors.New("log entry not found") + // ErrNotFound is our own version of raft's not found error. It's important + // it's exactly the same because the raft lib checks for equality with it's + // own type as a crucial part of replication processing (detecting end of logs + // and that a snapshot is needed for a follower). + ErrNotFound = raft.ErrLogNotFound ErrCorrupt = errors.New("WAL is corrupt") ErrSealed = errors.New("segment is sealed") ErrClosed = errors.New("closed") diff --git a/verifier/store.go b/verifier/store.go index fe6a502..efa0145 100644 --- a/verifier/store.go +++ b/verifier/store.go @@ -15,6 +15,9 @@ import ( "github.com/hashicorp/raft-wal/metrics" ) +var _ raft.LogStore = &LogStore{} +var _ raft.MonotonicLogStore = &LogStore{} + // LogStore is a raft.LogStore that acts as middleware around an underlying // persistent store. It provides support for periodically verifying that ranges // of logs read back from the LogStore match the values written, and the values @@ -239,3 +242,12 @@ func (s *LogStore) Close() error { } return nil } + +// IsMonotonic implements the raft.MonotonicLogStore interface. This is a shim +// to expose the underlying store as monotonically indexed or not. +func (s *LogStore) IsMonotonic() bool { + if store, ok := s.s.(raft.MonotonicLogStore); ok { + return store.IsMonotonic() + } + return false +} diff --git a/wal.go b/wal.go index 3d0cdbe..bc2b02f 100644 --- a/wal.go +++ b/wal.go @@ -34,6 +34,10 @@ var ( DefaultSegmentSize = 64 * 1024 * 1024 ) +var _ raft.LogStore = &WAL{} +var _ raft.MonotonicLogStore = &WAL{} +var _ raft.StableStore = &WAL{} + // WAL is a write-ahead log suitable for github.com/hashicorp/raft. type WAL struct { closed uint32 // atomically accessed to keep it first in struct for alignment. @@ -376,31 +380,36 @@ func (w *WAL) StoreLogs(logs []*raft.Log) error { lastIdx := s.lastIndex() // Special case, if the log is currently empty and this is the first append, - // we allow any starting index. But in most cases we've already created a - // segment with BaseIndex of 1 to start using. That means we need to update - // the metadata for the segment to set MinIndex. This is the only time we will - // force a metaDB sync on a StoreLogs call so it seems OK to ensure that - // MetaDB correctly reflects the range of logs stored. The alternative would - // be not to allocate the segment file at all until now but that would be even - // more expensive! - if lastIdx == 0 && logs[0].Index > 1 { - txn := func(s *state) (func(), func() error, error) { - seg1, ok := s.segments.Get(1) - if !ok { - // Can't happen! - return nil, nil, fmt.Errorf("invalid internal state! %w", ErrCorrupt) - } - // Note that we're mutating a copy of types.SegmentInfo since it's stored by value - // not reference. - seg1.MinIndex = logs[0].Index - s.segments = s.segments.Set(seg1.BaseIndex, seg1) - return nil, nil, nil - } - - if err := w.mutateStateLocked(txn); err != nil { + // we allow any starting index. We've already created an empty tail segment + // though and probably started at index 1. Rather than break the invariant + // that BaseIndex is the same as the first index in the segment (which causes + // lots of extra complexity lower down) we simply accept the additional cost + // in this rare case of removing the current tail and re-creating it with the + // correct BaseIndex for the first log we are about to append. In practice, + // this only happens on startup of a new server, or after a user snapshot + // restore which are both rare enough events that the cost is not significant + // since the cost of creating other state or restoring snapshots is larger + // anyway. We could theoretically defer creating at all until we know for sure + // but that is more complex internally since then everything has to handle the + // uninitialized case where the is no tail yet with special cases. + ti := s.getTailInfo() + // Note we check index != ti.BaseIndex rather than index != 1 so that this + // works even if we choose to initialize first segments to a BaseIndex other + // than 1. For example it might be marginally more performant to choose to + // initialize to the old MaxIndex + 1 after a truncate since that is what our + // raft library will use after a restore currently so will avoid this case on + // the next append, while still being generally safe. + if lastIdx == 0 && logs[0].Index != ti.BaseIndex { + if err := w.resetEmptyFirstSegmentBaseIndex(logs[0].Index); err != nil { return err } - // Now we can append! + + // Re-read state now we just changed it. + s2, release2 := w.acquireState() + defer release2() + + // Overwrite the state we read before so the code below uses the new state + s = s2 } // Encode logs @@ -618,16 +627,19 @@ func (w *WAL) rotateSegmentLocked(indexStart uint64) error { // new segment appended. newState must be a copy, taken under write lock which // is still held by the caller and its segments map must contain all non-tail // segments that should be in the log, all must be sealed at this point. The new -// segment's baseIndex will be the current last-segment's MaxIndex (or 0 if -// non). +// segment's baseIndex will be the current last-segment's MaxIndex + 1 (or 1 if +// no current tail segment). The func returned is to be executed post +// transaction commit to create the actual segment file. func (w *WAL) createNextSegment(newState *state) (func() error, error) { // Find existing sealed tail tail := newState.getTailInfo() - // If there is no tail, next baseIndex is 1 + // If there is no tail, next baseIndex is 1 (or the requested next base index) nextBaseIndex := uint64(1) if tail != nil { nextBaseIndex = tail.MaxIndex + 1 + } else if newState.nextBaseIndex > 0 { + nextBaseIndex = newState.nextBaseIndex } // Create a new segment @@ -662,8 +674,53 @@ func (w *WAL) createNextSegment(newState *state) (func() error, error) { return post, nil } +// resetEmptyFirstSegmentBaseIndex is used to change the baseIndex of the tail +// segment file if its empty. This is needed when the first log written has a +// different index to the base index that was assumed when the tail was created +// (e.g. on startup). It will return an error if the log is not currently empty. +func (w *WAL) resetEmptyFirstSegmentBaseIndex(newBaseIndex uint64) error { + txn := stateTxn(func(newState *state) (func(), func() error, error) { + if newState.lastIndex() > 0 { + return nil, nil, fmt.Errorf("can't reset BaseIndex on segment, log is not empty") + } + + fin := func() {} + + tailSeg := newState.getTailInfo() + if tailSeg != nil { + // There is an existing tail. Check if it needs to be replaced + if tailSeg.BaseIndex == newBaseIndex { + // It's fine as it is, no-op + return nil, nil, nil + } + // It needs to be removed + newState.segments = newState.segments.Delete(tailSeg.BaseIndex) + newState.tail = nil + fin = func() { + w.closeSegments([]io.Closer{tailSeg.r}) + w.deleteSegments(map[uint64]uint64{tailSeg.ID: tailSeg.BaseIndex}) + } + } + + // Ensure the newly created tail has the right base index + newState.nextBaseIndex = newBaseIndex + + // Create the new segment + post, err := w.createNextSegment(newState) + if err != nil { + return nil, nil, err + } + + return fin, post, nil + }) + + return w.mutateStateLocked(txn) +} + func (w *WAL) truncateHeadLocked(newMin uint64) error { txn := stateTxn(func(newState *state) (func(), func() error, error) { + oldLastIndex := newState.lastIndex() + // Iterate the segments to find any that are entirely deleted. toDelete := make(map[uint64]uint64) toClose := make([]io.Closer, 0, 1) @@ -691,7 +748,7 @@ func (w *WAL) truncateHeadLocked(newMin uint64) error { toDelete[seg.ID] = seg.BaseIndex toClose = append(toClose, seg.r) newState.segments = newState.segments.Delete(seg.BaseIndex) - nTruncated += (maxIdx - seg.MinIndex + 1) // +1 becuase MaxIndex is inclusive + nTruncated += (maxIdx - seg.MinIndex + 1) // +1 because MaxIndex is inclusive } // There may not be any segments (left) but if there are, update the new @@ -705,7 +762,11 @@ func (w *WAL) truncateHeadLocked(newMin uint64) error { } else { // If there is no head any more, then there is no tail either! We should // create a new blank one ready for use when we next append like we do - // during initialization. + // during initialization. As an optimization, we create it with a + // BaseIndex of the old MaxIndex + 1 since this is what our Raft library + // uses as the next log index after a restore so this avoids recreating + // the files a second time on the next append. + newState.nextBaseIndex = oldLastIndex + 1 pc, err := w.createNextSegment(newState) if err != nil { return nil, nil, err @@ -868,3 +929,9 @@ func (w *WAL) Close() error { } return nil } + +// IsMonotonic implements raft.MonotonicLogStore and informs the raft library +// that this store will only allow consecutive log indexes with no gaps. +func (w *WAL) IsMonotonic() bool { + return true +} diff --git a/wal_stubs_test.go b/wal_stubs_test.go index a20008e..45e0ba4 100644 --- a/wal_stubs_test.go +++ b/wal_stubs_test.go @@ -288,6 +288,21 @@ func (ts *testStorage) assertValidMetaState(t *testing.T) { if !isTail && seg.SealTime.IsZero() { t.Fatalf("unsealed segment not at tail in committed state") } + + // Make sure that the first log in the segment is the same as its base index + // (if the segment exists already, it might not right after meta updated but + // segment not created yet which is exercised in some tests). + if ts, ok := ts.segments[seg.ID]; ok { + tss := ts.loadState() + require.Equal(t, seg.BaseIndex, tss.info.BaseIndex) + it := tss.logs.Iterator() + it.First() + if !it.Done() { + _, log, ok := it.Next() + require.True(t, ok) + require.Equal(t, seg.BaseIndex, log.Index) + } + } } } @@ -545,6 +560,10 @@ func (s *testSegment) Append(entries []types.LogEntry) error { return errors.New("closed") } for _, e := range entries { + if e.Index != (newState.info.BaseIndex + uint64(newState.logs.Len())) { + return fmt.Errorf("non-monotonic append! BaseIndex=%d len=%d appended=%d", + newState.info.BaseIndex, newState.logs.Len(), e.Index) + } newState.logs = newState.logs.Set(e.Index, e) } return nil diff --git a/wal_test.go b/wal_test.go index 3cd21d4..6305b45 100644 --- a/wal_test.go +++ b/wal_test.go @@ -13,9 +13,14 @@ import ( "github.com/hashicorp/raft" "github.com/hashicorp/raft-wal/metrics" + "github.com/hashicorp/raft-wal/types" "github.com/stretchr/testify/require" ) +func TestNotFoundErrType(t *testing.T) { + require.Equal(t, types.ErrNotFound, raft.ErrLogNotFound) +} + func TestWALOpen(t *testing.T) { cases := []struct { name string @@ -275,8 +280,9 @@ func TestStoreLogs(t *testing.T) { store2 []*raft.Log expectErr string // validate recovery of data - expectFirstIndex uint64 - expectLastIndex uint64 + expectFirstIndex uint64 + expectLastIndex uint64 + expectFirstBaseIndex uint64 }{ { name: "empty log append", @@ -289,6 +295,8 @@ func TestStoreLogs(t *testing.T) { store: makeRaftLogs(10000, 5), expectFirstIndex: 10000, expectLastIndex: 10004, + // Ensure we actually re-created the first segment with the right base index. + expectFirstBaseIndex: 10000, }, { name: "existing segment log append", @@ -389,6 +397,9 @@ func TestStoreLogs(t *testing.T) { require.NoError(t, err) require.Equal(t, int(tc.expectLastIndex), int(last)) + // Check all the internal meta/segment state meets our invariants + ts.assertValidMetaState(t) + // Check all log entries exist that are meant to if tc.expectLastIndex > 0 { firstAppended := tc.expectLastIndex - uint64(len(tc.store)+len(tc.store2)) + 1 @@ -417,6 +428,22 @@ func TestStoreLogs(t *testing.T) { } } } + + // Check the actual base index of the first segment is the one we expect. + // This is an internal detail, but it's important as the contract with the + // Segment layer is broken if we assume we can start logs at a higher + // index than the BaseIndex. + if tc.expectFirstBaseIndex > 0 { + segments, err := ts.List() + require.NoError(t, err) + // First segment would have been ID 0, abse index 1 but we should have + // replaced it with a segment with the correct base index which would + // have ID 1 + require.Equal(t, map[uint64]uint64{1: tc.expectFirstBaseIndex}, segments) + + // Ensure that the old file was closed and deleted + ts.assertDeletedAndClosed(t, 1) + } }) } }