Skip to content
This repository was archived by the owner on Jun 23, 2025. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 69 additions & 67 deletions internal/wal/wal_forge.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,89 +304,91 @@
return files, nil
}

// ReplayCommand replays the commands from the WAL files.
// This method is thread safe.
func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error {

// ReplayCommand helper
func replaySegment(segmentPath string, cb func(*wire.Command) error) error {
var crc, entrySize uint32
var el w.Element

// Buffers to hold the header and the element bytes
bb1h := make([]byte, 8)
bb1ElementBytes := make([]byte, 10*1024)

// Get list of segment files ordered by timestamp in ascending order
segments, err := wl.segments()
file, err := os.Open(segmentPath)
if err != nil {
return fmt.Errorf("error getting wal-segment files: %w", err)
return fmt.Errorf("error opening wal-segment file %s: %w", segmentPath, err)
}

defer file.Close() // Ensure the file is closed after processing

// Process each segment file in order
for _, segment := range segments {
file, err := os.Open(segment)
if err != nil {
return fmt.Errorf("error opening wal-segment file %s: %w", segment, err)
}
reader := bufio.NewReader(file)
// Format: CRC32 (4 bytes) | Size of WAL entry (4 bytes) | WAL data

reader := bufio.NewReader(file)
// Format: CRC32 (4 bytes) | Size of WAL entry (4 bytes) | WAL data

// TODO: Replace this infinite loop with a more elegant solution
for {
// Read CRC32 (4 bytes) + entrySize (4 bytes)
if _, err := io.ReadFull(reader, bb1h); err != nil {
// TODO: this terminating connection should be handled in a better way
// and the loop should not be infinite.
// Edge case: this EOF error can happen even in the next step when
// we are reading the WAL element from the file.
if err == io.EOF {
break
}
file.Close()
return fmt.Errorf("error reading WAL: %w", err)
// TODO: Replace this infinite loop with a more elegant solution
for {
// Read CRC32 (4 bytes) + entrySize (4 bytes)
if _, err := io.ReadFull(reader, bb1h); err != nil {
// TODO: this terminating connection should be handled in a better way
// and the loop should not be infinite.
// Edge case: this EOF error can happen even in the next step when
// we are reading the WAL element from the file.
if err == io.EOF {
break
}
crc = binary.LittleEndian.Uint32(bb1h[0:4])
entrySize = binary.LittleEndian.Uint32(bb1h[4:8])
return fmt.Errorf("error reading WAL: %w", err)
}
crc = binary.LittleEndian.Uint32(bb1h[0:4])
entrySize = binary.LittleEndian.Uint32(bb1h[4:8])

if _, err := io.ReadFull(reader, bb1ElementBytes[:entrySize]); err != nil {
file.Close()
return fmt.Errorf("error reading WAL data: %w", err)
}
if _, err := io.ReadFull(reader, bb1ElementBytes[:entrySize]); err != nil {
return fmt.Errorf("error reading WAL data: %w", err)
}

// Calculate CRC32 only on the payload
expectedCRC := crc32.ChecksumIEEE(bb1ElementBytes[:entrySize])
if crc != expectedCRC {
// TODO: We are reprtitively closing the file here
// A better solution would be to move this logic to a function
// and use defer to close the file.
// The function. thus, in a way processes (replays) one segment at a time.
file.Close()

// TODO: THis is where we should trigger the WAL recovery
// Recovery process is all about truncating the segment file
// till this point and ignoring the rest.
// Log appropriate messages when this happens.
// Evaluate if this recovery mode should be a command line flag
// that would suggest if we should truncate, ignore, or stop the boot process.
return fmt.Errorf("CRC32 mismatch: expected %d, got %d", crc, expectedCRC)
}
// Calculate CRC32 only on the payload
expectedCRC := crc32.ChecksumIEEE(bb1ElementBytes[:entrySize])
if crc != expectedCRC {

Check failure on line 348 in internal/wal/wal_forge.go

View workflow job for this annotation

GitHub Actions / lint

empty-lines: extra empty line at the start of a block (revive)

// TODO: THis is where we should trigger the WAL recovery
// Recovery process is all about truncating the segment file
// till this point and ignoring the rest.
// Log appropriate messages when this happens.
// Evaluate if this recovery mode should be a command line flag
// that would suggest if we should truncate, ignore, or stop the boot process.
return fmt.Errorf("CRC32 mismatch: expected %d, got %d", crc, expectedCRC)
}

// Unmarshal the WAL entry to get the payload
if err := proto.Unmarshal(bb1ElementBytes[:entrySize], &el); err != nil {
file.Close()
return fmt.Errorf("error unmarshaling WAL entry: %w", err)
}
// Unmarshal the WAL entry to get the payload
if err := proto.Unmarshal(bb1ElementBytes[:entrySize], &el); err != nil {
return fmt.Errorf("error unmarshaling WAL entry: %w", err)
}

var c wire.Command
if err := proto.Unmarshal(el.Payload, &c); err != nil {
file.Close()
return fmt.Errorf("error unmarshaling command: %w", err)
}
var c wire.Command
if err := proto.Unmarshal(el.Payload, &c); err != nil {
return fmt.Errorf("error unmarshaling command: %w", err)
}

// Call provided replay function with parsed command
if err := cb(&c); err != nil {
file.Close()
return fmt.Errorf("error replaying command: %w", err)
}
// Call provided replay function with parsed command
if err := cb(&c); err != nil {
return fmt.Errorf("error replaying command: %w", err)
}
}
return nil
}

// ReplayCommand replays the commands from the WAL files.
// This method is thread safe.
func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error {

Check failure on line 379 in internal/wal/wal_forge.go

View workflow job for this annotation

GitHub Actions / lint

empty-lines: extra empty line at the start of a block (revive)

// Get list of segment files ordered by timestamp in ascending order
segments, err := wl.segments()
if err != nil {
return fmt.Errorf("error getting wal-segment files: %w", err)
}

// Process each segment file in order
for _, segment := range segments {
err2 := replaySegment(segment, cb)
if err2 != nil {
return err2
}
}

Expand Down
Loading