Skip to content

Commit

Permalink
Merge pull request #26 from hashicorp/waldump
Browse files Browse the repository at this point in the history
Support debug dumping of WAL entries
  • Loading branch information
banks authored Mar 31, 2023
2 parents cb648c6 + 8b98e5f commit 742eb42
Show file tree
Hide file tree
Showing 6 changed files with 506 additions and 73 deletions.
1 change: 1 addition & 0 deletions cmd/waldump/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
waldump
36 changes: 36 additions & 0 deletions cmd/waldump/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# waldump

A simple command for dumping the contents of WAL segment files to JSON for
debugging.

## Usage

```
$ waldump [-after INDEX] [-before INDEX] /path/to/wal/dir
...
{"Index":227281,"Term":4,"Type":0,"Data":"hpGEpUNvb3JkhKpBZGp1c3RtZW50yz7pEPrkTc4tpUVycm9yyz/B4NJg87MZpkhlaWdodMs/ABkEWHeDZqNWZWOYyz8FyF63P/XOyz8Fe2fyqYpayz7eXgvdsOWVyz7xX/ARy9MByz7XZq0fmx5eyz7x8ic7zxhJy78EgvusSgKUy77xVfw2sEr5pE5vZGWiczGpUGFydGl0aW9uoKdTZWdtZW50oA==","Extensions":null,"AppendedAt":"2023-03-23T12:24:05.440317Z"}
...
```

Each `raft.Log` is written out as JSON followed by a newline. The `Data` and
`Extensions` fields are opaque byte strings that will be base64 encoded.
Decoding those requires knowledge of the encoding used by the writing
application.

## Limitations

This tool is designed for debugging only. It does _not_ inspect the wal-meta
database. This has the nice property that you can safely dump the contexts of
WAL files even while the application is still writing to the WAL since we don't
have to take a lock on the meta database.

The downside is that this tool might in some edge cases output logs that have
already been deleted from the WAL. It's possible although extremely unlikely
that the WAL could be in the process of truncating the tail which could result
in there being both pre-truncate and post-truncate segment files present. This
tool might possibly output duplicate and out-of-order log indexes from before
and after the truncation. Or if `before` and `after` are used, it's possible we
might skip records entirely because an older file that has already been removed
was read instead of the newer one. These are all very unlikely in practice and
if the application that writes the WAL is still up and running are likely to be
resolved by the time you run the tool again.
62 changes: 62 additions & 0 deletions cmd/waldump/waldump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) HashiCorp, Inc.

package main

import (
"encoding/json"
"flag"
"fmt"
"os"

"github.com/hashicorp/raft"
wal "github.com/hashicorp/raft-wal"
"github.com/hashicorp/raft-wal/fs"
"github.com/hashicorp/raft-wal/segment"
"github.com/hashicorp/raft-wal/types"
)

type opts struct {
Dir string
After uint64
Before uint64
}

func main() {
var o opts
flag.Uint64Var(&o.After, "after", 0, "specified a raft index to use as an exclusive lower bound when dumping log entries.")
flag.Uint64Var(&o.Before, "before", 0, "specified a raft index to use as an exclusive upper bound when dumping log entries.")

flag.Parse()

// Accept dir as positional arg
o.Dir = flag.Arg(0)
if o.Dir == "" {
fmt.Println("Usage: waldump [-after INDEX] [-before INDEX] <path to WAL dir>")
os.Exit(1)
}

vfs := fs.New()
f := segment.NewFiler(o.Dir, vfs)

codec := &wal.BinaryCodec{}
var log raft.Log
enc := json.NewEncoder(os.Stdout)

err := f.DumpLogs(o.After, o.Before, func(info types.SegmentInfo, e types.LogEntry) (bool, error) {
if info.Codec != wal.CodecBinaryV1 {
return false, fmt.Errorf("unsupported codec %d in file %s", info.Codec, segment.FileName(info))
}
if err := codec.Decode(e.Data, &log); err != nil {
return false, err
}
// Output the raft Log struct as JSON
if err := enc.Encode(log); err != nil {
return false, err
}
return true, nil
})
if err != nil {
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}
}
146 changes: 141 additions & 5 deletions segment/filer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,18 @@ func (f *Filer) Open(info types.SegmentInfo) (types.SegmentReader, error) {
// unclean shutdown. The returned map is a map of ID -> BaseIndex. BaseIndex
// is returned to allow subsequent Delete calls to be made.
func (f *Filer) List() (map[uint64]uint64, error) {
segs := make(map[uint64]uint64)
segs, _, err := f.listInternal()
return segs, err
}

func (f *Filer) listInternal() (map[uint64]uint64, []uint64, error) {
files, err := f.vfs.ListDir(f.dir)
if err != nil {
return nil, err
return nil, nil, err
}

segs := make(map[uint64]uint64)
sorted := make([]uint64, 0)
for _, file := range files {
if !strings.HasSuffix(file, segmentFileSuffix) {
continue
Expand All @@ -135,17 +140,18 @@ func (f *Filer) List() (map[uint64]uint64, error) {
var bIdx, id uint64
n, err := fmt.Sscanf(file, segmentFileNamePattern, &bIdx, &id)
if err != nil {
return nil, types.ErrCorrupt
return nil, nil, types.ErrCorrupt
}
if n != 2 {
// Misnamed segment files with the right suffix indicates a bug or
// tampering, we can't be sure what's happened to the data.
return nil, types.ErrCorrupt
return nil, nil, types.ErrCorrupt
}
segs[id] = bIdx
sorted = append(sorted, id)
}

return segs, nil
return segs, sorted, nil
}

// Delete removes the segment with given baseIndex and id if it exists. Note
Expand All @@ -157,3 +163,133 @@ func (f *Filer) Delete(baseIndex uint64, ID uint64) error {
fname := fmt.Sprintf(segmentFileNamePattern, baseIndex, ID)
return f.vfs.Delete(f.dir, fname)
}

// DumpSegment attempts to read the segment file specified by the baseIndex and
// ID. It's intended purpose is for debugging the contents of segment files and
// unlike the SegmentFiler interface, it doesn't assume the caller has access to
// the correct metadata. This allows dumping log segments in a WAL that is still
// being written to by another process. Without metadata we don't know if the
// file is sealed so always recover by reading through the whole file. If after
// or before are non-zero, the specify a exclusive lower or upper bound on which
// log entries should be emitted. No error checking is done on the read data. fn
// is called for each entry passing the raft info read from the file header (so
// that the caller knows which codec to use for example) the raft index of the
// entry and the raw bytes of the entry itself. The callback must return true to
// continue reading. The data slice is only valid for the lifetime of the call.
func (f *Filer) DumpSegment(baseIndex uint64, ID uint64, after, before uint64, fn func(info types.SegmentInfo, e types.LogEntry) (bool, error)) error {
fname := fmt.Sprintf(segmentFileNamePattern, baseIndex, ID)

rf, err := f.vfs.OpenReader(f.dir, fname)
if err != nil {
return err
}

buf := make([]byte, 64*1024)
idx := baseIndex

type frameInfo struct {
Index uint64
Offset int64
Len uint32
}
var batch []frameInfo

_, err = readThroughSegment(rf, func(info types.SegmentInfo, fh frameHeader, offset int64) (bool, error) {
if fh.typ == FrameCommit {
// All the previous entries have been committed. Read them and send up to
// caller.
for _, frame := range batch {
// Check the header is reasonable
if frame.Len > MaxEntrySize {
return false, fmt.Errorf("failed to read entry idx=%d, frame header length (%d) is too big: %w",
frame.Index, frame.Len, err)
}

if frame.Len > uint32(len(buf)) {
buf = make([]byte, frame.Len)
}

n, err := rf.ReadAt(buf[:frame.Len], frame.Offset+frameHeaderLen)
if err != nil {
return false, err
}
if uint32(n) < frame.Len {
return false, io.ErrUnexpectedEOF
}

ok, err := fn(info, types.LogEntry{Index: frame.Index, Data: buf[:n]})
if !ok || err != nil {
return ok, err
}
}
// Reset batch
batch = batch[:0]
return true, nil
}

if fh.typ != FrameEntry {
return true, nil
}

if idx <= after {
// Not in the range we care about, skip reading the entry.
idx++
return true, nil
}
if before > 0 && idx >= before {
// We're done
return false, nil
}

batch = append(batch, frameInfo{idx, offset, fh.len})
idx++
return true, nil
})

return err
}

// DumpLogs attempts to read all log entries from segment files in the directory
// for debugging purposes. It does _not_ use the metadata and so may output log
// entries that are uncommitted or already truncated as far as the writing
// process is concerned. As such it should not be used for replication of data.
// It is useful though to debug the contents of the log even while the writing
// application is still running. After and before if non-zero specify exclusive
// bounds on the logs that should be returned which may allow the implementation
// to skip reading entire segment files that are not in the range.
func (f *Filer) DumpLogs(after, before uint64, fn func(info types.SegmentInfo, e types.LogEntry) (bool, error)) error {
baseIndexes, segIDsSorted, err := f.listInternal()
if err != nil {
return err
}

for i, id := range segIDsSorted {
baseIndex := baseIndexes[id]
nextBaseIndex := uint64(0)
if i+1 < len(segIDsSorted) {
// This is not the last segment, peek at the base index of that one and
// assume that this segment won't contain indexes that high.
nextBaseIndex = baseIndexes[segIDsSorted[i+1]]
}
// See if this file contains any indexes in the range
if after > 0 && nextBaseIndex > 0 && after >= nextBaseIndex {
// This segment is all indexes before the lower bound we care about
continue
}
if before > 0 && before <= baseIndex {
// This segment is all indexes higher than the upper bound. We've output
// every log in the range at this point (barring edge cases where we race
// with a truncation which leaves multiple generations of segment files on
// disk which we are going to ignore for now).
return nil
}

// We probably care about at least some of the entries in this segment
err := f.DumpSegment(baseIndex, id, after, before, fn)
if err != nil {
return err
}
}

return nil
}
Loading

0 comments on commit 742eb42

Please sign in to comment.