diff --git a/internal/wal/wal_forge.go b/internal/wal/wal_forge.go index edd5bff4e..e88c73849 100644 --- a/internal/wal/wal_forge.go +++ b/internal/wal/wal_forge.go @@ -304,89 +304,91 @@ func (wl *walForge) segments() ([]string, error) { return files, nil } -// ReplayCommand replays the commands from the WAL files. -// This method is thread safe. -func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { + +// ReplayCommand helper +func replaySegment(segmentPath string, cb func(*wire.Command) error) error { var crc, entrySize uint32 var el w.Element // Buffers to hold the header and the element bytes bb1h := make([]byte, 8) bb1ElementBytes := make([]byte, 10*1024) - - // Get list of segment files ordered by timestamp in ascending order - segments, err := wl.segments() + file, err := os.Open(segmentPath) if err != nil { - return fmt.Errorf("error getting wal-segment files: %w", err) + return fmt.Errorf("error opening wal-segment file %s: %w", segmentPath, err) } + + defer file.Close() // Ensure the file is closed after processing - // Process each segment file in order - for _, segment := range segments { - file, err := os.Open(segment) - if err != nil { - return fmt.Errorf("error opening wal-segment file %s: %w", segment, err) - } + reader := bufio.NewReader(file) + // Format: CRC32 (4 bytes) | Size of WAL entry (4 bytes) | WAL data - reader := bufio.NewReader(file) - // Format: CRC32 (4 bytes) | Size of WAL entry (4 bytes) | WAL data - - // TODO: Replace this infinite loop with a more elegant solution - for { - // Read CRC32 (4 bytes) + entrySize (4 bytes) - if _, err := io.ReadFull(reader, bb1h); err != nil { - // TODO: this terminating connection should be handled in a better way - // and the loop should not be infinite. - // Edge case: this EOF error can happen even in the next step when - // we are reading the WAL element from the file. - if err == io.EOF { - break - } - file.Close() - return fmt.Errorf("error reading WAL: %w", err) + // TODO: Replace this infinite loop with a more elegant solution + for { + // Read CRC32 (4 bytes) + entrySize (4 bytes) + if _, err := io.ReadFull(reader, bb1h); err != nil { + // TODO: this terminating connection should be handled in a better way + // and the loop should not be infinite. + // Edge case: this EOF error can happen even in the next step when + // we are reading the WAL element from the file. + if err == io.EOF { + break } - crc = binary.LittleEndian.Uint32(bb1h[0:4]) - entrySize = binary.LittleEndian.Uint32(bb1h[4:8]) + return fmt.Errorf("error reading WAL: %w", err) + } + crc = binary.LittleEndian.Uint32(bb1h[0:4]) + entrySize = binary.LittleEndian.Uint32(bb1h[4:8]) - if _, err := io.ReadFull(reader, bb1ElementBytes[:entrySize]); err != nil { - file.Close() - return fmt.Errorf("error reading WAL data: %w", err) - } + if _, err := io.ReadFull(reader, bb1ElementBytes[:entrySize]); err != nil { + return fmt.Errorf("error reading WAL data: %w", err) + } - // Calculate CRC32 only on the payload - expectedCRC := crc32.ChecksumIEEE(bb1ElementBytes[:entrySize]) - if crc != expectedCRC { - // TODO: We are reprtitively closing the file here - // A better solution would be to move this logic to a function - // and use defer to close the file. - // The function. thus, in a way processes (replays) one segment at a time. - file.Close() - - // TODO: THis is where we should trigger the WAL recovery - // Recovery process is all about truncating the segment file - // till this point and ignoring the rest. - // Log appropriate messages when this happens. - // Evaluate if this recovery mode should be a command line flag - // that would suggest if we should truncate, ignore, or stop the boot process. - return fmt.Errorf("CRC32 mismatch: expected %d, got %d", crc, expectedCRC) - } + // Calculate CRC32 only on the payload + expectedCRC := crc32.ChecksumIEEE(bb1ElementBytes[:entrySize]) + if crc != expectedCRC { + + // TODO: THis is where we should trigger the WAL recovery + // Recovery process is all about truncating the segment file + // till this point and ignoring the rest. + // Log appropriate messages when this happens. + // Evaluate if this recovery mode should be a command line flag + // that would suggest if we should truncate, ignore, or stop the boot process. + return fmt.Errorf("CRC32 mismatch: expected %d, got %d", crc, expectedCRC) + } - // Unmarshal the WAL entry to get the payload - if err := proto.Unmarshal(bb1ElementBytes[:entrySize], &el); err != nil { - file.Close() - return fmt.Errorf("error unmarshaling WAL entry: %w", err) - } + // Unmarshal the WAL entry to get the payload + if err := proto.Unmarshal(bb1ElementBytes[:entrySize], &el); err != nil { + return fmt.Errorf("error unmarshaling WAL entry: %w", err) + } - var c wire.Command - if err := proto.Unmarshal(el.Payload, &c); err != nil { - file.Close() - return fmt.Errorf("error unmarshaling command: %w", err) - } + var c wire.Command + if err := proto.Unmarshal(el.Payload, &c); err != nil { + return fmt.Errorf("error unmarshaling command: %w", err) + } - // Call provided replay function with parsed command - if err := cb(&c); err != nil { - file.Close() - return fmt.Errorf("error replaying command: %w", err) - } + // Call provided replay function with parsed command + if err := cb(&c); err != nil { + return fmt.Errorf("error replaying command: %w", err) + } + } + return nil +} + +// ReplayCommand replays the commands from the WAL files. +// This method is thread safe. +func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { + + // Get list of segment files ordered by timestamp in ascending order + segments, err := wl.segments() + if err != nil { + return fmt.Errorf("error getting wal-segment files: %w", err) + } + + // Process each segment file in order + for _, segment := range segments { + err2 := replaySegment(segment, cb) + if err2 != nil { + return err2 } }