diff --git a/fs/fs.go b/fs/fs.go index 2de6ff0..d88a65f 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -87,7 +87,7 @@ func (fs *FS) Create(dir string, name string, size uint64) (types.WritableFile, return fi, nil } -// Delete indicates the file is no longer required. Typically it should be +// Delete indicates the file is no longer required. Typically it shoutld be // deleted from the underlying system to free disk space. func (fs *FS) Delete(dir string, name string) error { if err := os.Remove(filepath.Join(dir, name)); err != nil { diff --git a/metadb/metadb.go b/metadb/metadb.go index 85a054a..a8f34a3 100644 --- a/metadb/metadb.go +++ b/metadb/metadb.go @@ -9,7 +9,10 @@ import ( "fmt" "os" "path/filepath" + "time" + "github.com/hashicorp/raft-wal/fs" + "github.com/hashicorp/raft-wal/segment" "github.com/hashicorp/raft-wal/types" "go.etcd.io/bbolt" ) @@ -63,12 +66,12 @@ func (db *BoltMetaDB) ensureOpen(dir string) error { // BoltDB can get stuck in invalid states if we crash while it's initializing. // We can't distinguish those as safe to just wipe it and start again because // we don't know for sure if it's failing due to bad init or later corruption - // (which would loose data if we just wipe and start over). So to ensure + // (which would lose data if we just wipe and start over). So to ensure // initial creation of the WAL is as crash-safe as possible we will manually // detect we have an atomic init procedure: // 1. Check if file exits already. If yes, skip init and just open it. // 2. Delete any existing DB file with tmp name - // 3. Creat a new BoltDB that is empty and has the buckets with a temp name. + // 3. Create a new BoltDB that is empty and has the buckets with a temp name. // 4. Once that's committed, rename to final name and Fsync parent dir _, err := os.Stat(fileName) if err == nil { @@ -80,13 +83,81 @@ func (db *BoltMetaDB) ensureOpen(dir string) error { return fmt.Errorf("failed to stat %s: %w", FileName, err) } - // File doesn't exist, initialize a new DB in a crash-safe way + // File doesn't exist, initialize a new DB in a crash-safe way. if err := safeInitBoltDB(dir); err != nil { return fmt.Errorf("failed initializing meta DB: %w", err) } - // All good, now open it! - return open() + // Open the new db, but don't return just yet + err = open() + if err != nil { + return fmt.Errorf("error opening new metadb: %w", err) + } + + // Now that we have a brand new metaDB, check to see if segment files exist. + // If they do, then we're probably trying to do a recovery, and we can + // populate the new db with some initial values read from the segment file + // headers, so that we don't error later on when trying to create a new segment + // file that already exists. + sfe, err := segmentFilesExist(dir) + if err != nil { + return fmt.Errorf("failed to check for segment files: %w", err) + } + + if sfe { + fmt.Println("rebuilding meta state from segment files") + state := types.PersistentState{} + vfs := fs.New() + f := segment.NewFiler(dir, vfs) + indexes, err := f.List() + if err != nil { + return fmt.Errorf("failed to list segment IDs: %w", err) + } + + for id, baseIndex := range indexes { + info, err := f.HeaderInfo(baseIndex, id) + if err != nil { + return fmt.Errorf("failed to read header for file at baseIndex %d id %d: %w", baseIndex, id, err) + } + state.NextSegmentID = info.ID + 1 + si := types.SegmentInfo{ + ID: info.ID, + BaseIndex: info.BaseIndex, + MinIndex: info.MinIndex, + MaxIndex: info.MaxIndex, + Codec: info.Codec, + IndexStart: info.IndexStart, + CreateTime: info.CreateTime, + SealTime: time.Now(), + SizeLimit: info.SizeLimit, + } + state.Segments = append(state.Segments, si) + } + + err = db.CommitState(state) + if err != nil { + return fmt.Errorf("failed to commit state: %w", err) + } + } + + return nil +} + +func segmentFilesExist(dir string) (bool, error) { + sfe := false + entries, err := os.ReadDir(dir) + if err != nil { + return false, err + } + + for _, e := range entries { + if filepath.Ext(e.Name()) == ".wal" { + sfe = true + break + } + } + + return sfe, nil } func safeInitBoltDB(dir string) error { @@ -173,6 +244,7 @@ func (db *BoltMetaDB) Load(dir string) (types.PersistentState, error) { if err := json.Unmarshal(raw, &state); err != nil { return state, fmt.Errorf("%w: failed to parse persisted state: %s", types.ErrCorrupt, err) } + fmt.Printf("state: %#v\n", state) return state, nil } diff --git a/segment/filer.go b/segment/filer.go index d5252a7..d442bf8 100644 --- a/segment/filer.go +++ b/segment/filer.go @@ -71,11 +71,24 @@ func (f *Filer) RecoverTail(info types.SegmentInfo) (types.SegmentWriter, error) func (f *Filer) Open(info types.SegmentInfo) (types.SegmentReader, error) { fname := info.FileName() - rf, err := f.vfs.OpenReader(f.dir, fname) + rf, gotInfo, err := f.headerInfo(fname) if err != nil { return nil, err } + if err := validateFileHeader(*gotInfo, info); err != nil { + return nil, err + } + + return openReader(info, rf, &f.bufPool) +} + +func (f *Filer) headerInfo(name string) (types.ReadableFile, *types.SegmentInfo, error) { + rf, err := f.vfs.OpenReader(f.dir, name) + if err != nil { + return nil, nil, err + } + // Validate header here since openReader is re-used by writer where it's valid // for the file header not to be committed yet after a crash so we can't check // it there. @@ -87,21 +100,21 @@ func (f *Filer) Open(info types.SegmentInfo) (types.SegmentReader, error) { // never not have a valid header. (I.e. even if crashes happen it should // be impossible to seal a segment with no header written so this // indicates that something truncated the file after the fact) - return nil, fmt.Errorf("%w: failed to read header: %s", types.ErrCorrupt, err) + return nil, nil, fmt.Errorf("%w: failed to read header: %s", types.ErrCorrupt, err) } - return nil, err - } - - gotInfo, err := readFileHeader(hdr[:]) - if err != nil { - return nil, err + return nil, nil, err } - if err := validateFileHeader(*gotInfo, info); err != nil { - return nil, err - } + info, err := readFileHeader(hdr[:]) + return rf, info, err +} - return openReader(info, rf, &f.bufPool) +// HeaderInfo takes a baseIndex and ID and returns the information from the header of said file. +// This is useful during recovery when the meta-db doesn't exist. +func (f *Filer) HeaderInfo(baseIndex uint64, ID uint64) (*types.SegmentInfo, error) { + fname := fmt.Sprintf(types.SegmentFileNamePattern, baseIndex, ID) + _, info, err := f.headerInfo(fname) + return info, err } // List returns the set of segment IDs currently stored. It's used by the WAL @@ -144,10 +157,10 @@ func (f *Filer) listInternal() (map[uint64]uint64, []uint64, error) { } // Delete removes the segment with given baseIndex and id if it exists. Note -// that baseIndex is technically redundant since ID is unique on it's own. But +// that baseIndex is technically redundant since ID is unique on its own. But // in practice we name files (or keys) with both so that they sort correctly. -// This interface allows a simpler implementation where we can just delete -// the file if it exists without having to scan the underlying storage for a. +// This interface allows a simpler implementation where we can just delete +// the file if it exists without having to scan the underlying storage for a - ???. func (f *Filer) Delete(baseIndex uint64, ID uint64) error { fname := fmt.Sprintf(types.SegmentFileNamePattern, baseIndex, ID) return f.vfs.Delete(f.dir, fname) @@ -159,7 +172,7 @@ func (f *Filer) Delete(baseIndex uint64, ID uint64) error { // the correct metadata. This allows dumping log segments in a WAL that is still // being written to by another process. Without metadata we don't know if the // file is sealed so always recover by reading through the whole file. If after -// or before are non-zero, the specify a exclusive lower or upper bound on which +// or before are non-zero, they specify an exclusive lower or upper bound on which // log entries should be emitted. No error checking is done on the read data. fn // is called for each entry passing the raft info read from the file header (so // that the caller knows which codec to use for example) the raft index of the diff --git a/wal.go b/wal.go index aca27e6..f634b97 100644 --- a/wal.go +++ b/wal.go @@ -228,6 +228,7 @@ func Open(dir string, opts ...walOpt) (*WAL, error) { // above) there are no readers yet since we are constructing a new WAL so we // don't need to jump through the mutateState hoops yet! w.s.Store(&newState) + fmt.Printf("newState: %#v\n", newState) // Delete any unused segment files left over after a crash. w.deleteSegments(toDelete)