From 68352bdf66ea5a5a2fea0b6372cc071bb6bc7045 Mon Sep 17 00:00:00 2001 From: Saubhik Kumar Date: Wed, 4 Jun 2025 19:30:07 +0530 Subject: [PATCH 1/2] defer file close after file open --- internal/wal/wal_forge.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/internal/wal/wal_forge.go b/internal/wal/wal_forge.go index edd5bff4e..6bf7281cc 100644 --- a/internal/wal/wal_forge.go +++ b/internal/wal/wal_forge.go @@ -326,6 +326,8 @@ func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { if err != nil { return fmt.Errorf("error opening wal-segment file %s: %w", segment, err) } + + defer file.Close() // Ensure the file is closed after processing reader := bufio.NewReader(file) // Format: CRC32 (4 bytes) | Size of WAL entry (4 bytes) | WAL data @@ -341,25 +343,18 @@ func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { if err == io.EOF { break } - file.Close() return fmt.Errorf("error reading WAL: %w", err) } crc = binary.LittleEndian.Uint32(bb1h[0:4]) entrySize = binary.LittleEndian.Uint32(bb1h[4:8]) if _, err := io.ReadFull(reader, bb1ElementBytes[:entrySize]); err != nil { - file.Close() return fmt.Errorf("error reading WAL data: %w", err) } // Calculate CRC32 only on the payload expectedCRC := crc32.ChecksumIEEE(bb1ElementBytes[:entrySize]) if crc != expectedCRC { - // TODO: We are reprtitively closing the file here - // A better solution would be to move this logic to a function - // and use defer to close the file. - // The function. thus, in a way processes (replays) one segment at a time. - file.Close() // TODO: THis is where we should trigger the WAL recovery // Recovery process is all about truncating the segment file @@ -372,19 +367,16 @@ func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { // Unmarshal the WAL entry to get the payload if err := proto.Unmarshal(bb1ElementBytes[:entrySize], &el); err != nil { - file.Close() return fmt.Errorf("error unmarshaling WAL entry: %w", err) } var c wire.Command if err := proto.Unmarshal(el.Payload, &c); err != nil { - file.Close() return fmt.Errorf("error unmarshaling command: %w", err) } // Call provided replay function with parsed command if err := cb(&c); err != nil { - file.Close() return fmt.Errorf("error replaying command: %w", err) } } From 9316b8a487890691c0203da320ba57e4d399df60 Mon Sep 17 00:00:00 2001 From: Saubhik Kumar Date: Wed, 4 Jun 2025 19:54:23 +0530 Subject: [PATCH 2/2] use helper function instead --- internal/wal/wal_forge.go | 130 ++++++++++++++++++++------------------ 1 file changed, 70 insertions(+), 60 deletions(-) diff --git a/internal/wal/wal_forge.go b/internal/wal/wal_forge.go index 6bf7281cc..e88c73849 100644 --- a/internal/wal/wal_forge.go +++ b/internal/wal/wal_forge.go @@ -304,81 +304,91 @@ func (wl *walForge) segments() ([]string, error) { return files, nil } -// ReplayCommand replays the commands from the WAL files. -// This method is thread safe. -func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { + +// ReplayCommand helper +func replaySegment(segmentPath string, cb func(*wire.Command) error) error { var crc, entrySize uint32 var el w.Element // Buffers to hold the header and the element bytes bb1h := make([]byte, 8) bb1ElementBytes := make([]byte, 10*1024) - - // Get list of segment files ordered by timestamp in ascending order - segments, err := wl.segments() + file, err := os.Open(segmentPath) if err != nil { - return fmt.Errorf("error getting wal-segment files: %w", err) + return fmt.Errorf("error opening wal-segment file %s: %w", segmentPath, err) } + + defer file.Close() // Ensure the file is closed after processing - // Process each segment file in order - for _, segment := range segments { - file, err := os.Open(segment) - if err != nil { - return fmt.Errorf("error opening wal-segment file %s: %w", segment, err) - } - - defer file.Close() // Ensure the file is closed after processing - - reader := bufio.NewReader(file) - // Format: CRC32 (4 bytes) | Size of WAL entry (4 bytes) | WAL data - - // TODO: Replace this infinite loop with a more elegant solution - for { - // Read CRC32 (4 bytes) + entrySize (4 bytes) - if _, err := io.ReadFull(reader, bb1h); err != nil { - // TODO: this terminating connection should be handled in a better way - // and the loop should not be infinite. - // Edge case: this EOF error can happen even in the next step when - // we are reading the WAL element from the file. - if err == io.EOF { - break - } - return fmt.Errorf("error reading WAL: %w", err) - } - crc = binary.LittleEndian.Uint32(bb1h[0:4]) - entrySize = binary.LittleEndian.Uint32(bb1h[4:8]) + reader := bufio.NewReader(file) + // Format: CRC32 (4 bytes) | Size of WAL entry (4 bytes) | WAL data - if _, err := io.ReadFull(reader, bb1ElementBytes[:entrySize]); err != nil { - return fmt.Errorf("error reading WAL data: %w", err) + // TODO: Replace this infinite loop with a more elegant solution + for { + // Read CRC32 (4 bytes) + entrySize (4 bytes) + if _, err := io.ReadFull(reader, bb1h); err != nil { + // TODO: this terminating connection should be handled in a better way + // and the loop should not be infinite. + // Edge case: this EOF error can happen even in the next step when + // we are reading the WAL element from the file. + if err == io.EOF { + break } + return fmt.Errorf("error reading WAL: %w", err) + } + crc = binary.LittleEndian.Uint32(bb1h[0:4]) + entrySize = binary.LittleEndian.Uint32(bb1h[4:8]) - // Calculate CRC32 only on the payload - expectedCRC := crc32.ChecksumIEEE(bb1ElementBytes[:entrySize]) - if crc != expectedCRC { - - // TODO: THis is where we should trigger the WAL recovery - // Recovery process is all about truncating the segment file - // till this point and ignoring the rest. - // Log appropriate messages when this happens. - // Evaluate if this recovery mode should be a command line flag - // that would suggest if we should truncate, ignore, or stop the boot process. - return fmt.Errorf("CRC32 mismatch: expected %d, got %d", crc, expectedCRC) - } + if _, err := io.ReadFull(reader, bb1ElementBytes[:entrySize]); err != nil { + return fmt.Errorf("error reading WAL data: %w", err) + } - // Unmarshal the WAL entry to get the payload - if err := proto.Unmarshal(bb1ElementBytes[:entrySize], &el); err != nil { - return fmt.Errorf("error unmarshaling WAL entry: %w", err) - } + // Calculate CRC32 only on the payload + expectedCRC := crc32.ChecksumIEEE(bb1ElementBytes[:entrySize]) + if crc != expectedCRC { + + // TODO: THis is where we should trigger the WAL recovery + // Recovery process is all about truncating the segment file + // till this point and ignoring the rest. + // Log appropriate messages when this happens. + // Evaluate if this recovery mode should be a command line flag + // that would suggest if we should truncate, ignore, or stop the boot process. + return fmt.Errorf("CRC32 mismatch: expected %d, got %d", crc, expectedCRC) + } - var c wire.Command - if err := proto.Unmarshal(el.Payload, &c); err != nil { - return fmt.Errorf("error unmarshaling command: %w", err) - } + // Unmarshal the WAL entry to get the payload + if err := proto.Unmarshal(bb1ElementBytes[:entrySize], &el); err != nil { + return fmt.Errorf("error unmarshaling WAL entry: %w", err) + } - // Call provided replay function with parsed command - if err := cb(&c); err != nil { - return fmt.Errorf("error replaying command: %w", err) - } + var c wire.Command + if err := proto.Unmarshal(el.Payload, &c); err != nil { + return fmt.Errorf("error unmarshaling command: %w", err) + } + + // Call provided replay function with parsed command + if err := cb(&c); err != nil { + return fmt.Errorf("error replaying command: %w", err) + } + } + return nil +} + +// ReplayCommand replays the commands from the WAL files. +// This method is thread safe. +func (wl *walForge) ReplayCommand(cb func(*wire.Command) error) error { + + // Get list of segment files ordered by timestamp in ascending order + segments, err := wl.segments() + if err != nil { + return fmt.Errorf("error getting wal-segment files: %w", err) + } + + // Process each segment file in order + for _, segment := range segments { + err2 := replaySegment(segment, cb) + if err2 != nil { + return err2 } }