From e5f55dfcb6a0a845025b2f3cf1969eb5e03f5755 Mon Sep 17 00:00:00 2001 From: marco Date: Sun, 1 Feb 2026 13:36:46 -0500 Subject: [PATCH 1/7] latest --- internal/irzstd/disk.go | 40 ++++++++++++++++++++++++++++++++++++++- internal/irzstd/memory.go | 36 ++++++++++++++++++++++++++++++++++- internal/irzstd/state.go | 27 ++++++++++++++++++++++++++ internal/irzstd/writer.go | 6 ++++++ 4 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 internal/irzstd/state.go diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 43e2ecc..0a97497 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -41,6 +41,7 @@ type diskWriter struct { irWriter *ir.Writer irTotalBytes int zstdWriter *zstd.Encoder + state WriterState } // Opens a new [diskWriter] using files for IR and Zstd buffers. For use when use_disk_store @@ -71,6 +72,7 @@ func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) { zstdFile: zstdFile, irWriter: irWriter, zstdWriter: zstdWriter, + state: Open, } return &diskWriter, nil @@ -108,6 +110,7 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { zstdFile: zstdFile, irWriter: nil, zstdWriter: zstdWriter, + state: Open, } irFileSize, err := diskWriter.getIrFileSize() @@ -140,6 +143,10 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd, error flushing buffers func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { + if w.state != Open { + return 0, fmt.Errorf("cannot write: writer state is %s, expected %s", w.state, Open) + } + numBytes, numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { return numEvents, err @@ -169,6 +176,13 @@ func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { // Returns: // - err: Error flushing/closing buffers func (w *diskWriter) CloseStreams() error { + if w.state == StreamsClosed { + return nil + } + if w.state != Open { + return fmt.Errorf("cannot close streams: writer state is %s, expected %s", w.state, Open) + } + // IR buffer may not be empty, so must be flushed prior to adding trailing EndOfStream byte. err := w.flushIrBuffer() if err != nil { @@ -186,14 +200,17 @@ func (w *diskWriter) CloseStreams() error { w.zstdWriter.Write([]byte{irEndOfStreamByte}) err = w.zstdWriter.Close() if err != nil { + w.state = Corrupted return err } _, err = w.zstdFile.Seek(0, io.SeekStart) if err != nil { + w.state = Corrupted return err } + w.state = StreamsClosed return nil } @@ -203,9 +220,14 @@ func (w *diskWriter) CloseStreams() error { // Returns: // - err: Error opening IR writer, error IR buffer not empty func (w *diskWriter) Reset() error { + if w.state != StreamsClosed { + return fmt.Errorf("cannot reset: writer state is %s, expected %s", w.state, StreamsClosed) + } + var err error w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile) if err != nil { + w.state = Corrupted return err } @@ -218,16 +240,19 @@ func (w *diskWriter) Reset() error { _, err = w.zstdFile.Seek(0, io.SeekStart) if err != nil { + w.state = Corrupted return err } err = w.zstdFile.Truncate(0) if err != nil { + w.state = Corrupted return err } w.zstdWriter.Reset(w.zstdFile) + w.state = Open return nil } @@ -260,6 +285,15 @@ func (w *diskWriter) Close() error { return nil } + +// Getter for state. +// +// Returns: +// - state: Current state +func (w *diskWriter) GetState() WriterState { + return w.state +} + // Getter for useDiskBuffer. // // Returns: @@ -294,7 +328,6 @@ func (w *diskWriter) flushIrBuffer() error { return fmt.Errorf("error flush called with non-existent buffer") } - // Flush is called during Close(), and possible that the IR buffer is empty. if w.irTotalBytes == 0 { return nil } @@ -303,16 +336,19 @@ func (w *diskWriter) flushIrBuffer() error { _, err := w.irFile.Seek(0, io.SeekStart) if err != nil { + w.state = Corrupted return err } _, err = io.Copy(w.zstdWriter, w.irFile) if err != nil { + w.state = Corrupted return err } err = w.zstdWriter.Close() if err != nil { + w.state = Corrupted return err } @@ -322,11 +358,13 @@ func (w *diskWriter) flushIrBuffer() error { _, err = w.irFile.Seek(0, io.SeekStart) if err != nil { + w.state = Corrupted return err } err = w.irFile.Truncate(0) if err != nil { + w.state = Corrupted return err } diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 6e86c7b..0f25df3 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -18,6 +18,7 @@ type memoryWriter struct { zstdBuffer *bytes.Buffer irWriter *ir.Writer zstdWriter *zstd.Encoder + state WriterState } // Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is @@ -43,6 +44,7 @@ func NewMemoryWriter() (*memoryWriter, error) { irWriter: irWriter, zstdWriter: zstdWriter, zstdBuffer: &zstdBuffer, + state: Open, } return &memoryWriter, nil @@ -57,6 +59,10 @@ func NewMemoryWriter() (*memoryWriter, error) { // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { + if w.state != Open { + return 0, fmt.Errorf("cannot write: writer state is %s, expected %s", w.state, Open) + } + _, numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { return numEvents, err @@ -70,12 +76,26 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { // Returns: // - err: Error closing buffers func (w *memoryWriter) CloseStreams() error { + if w.state == StreamsClosed { + return nil + } + if w.state != Open { + return fmt.Errorf("cannot close streams: writer state is %s, expected %s", w.state, Open) + } + if err := w.irWriter.Close(); err != nil { + w.state = Corrupted return err } w.irWriter = nil - return w.zstdWriter.Close() + if err := w.zstdWriter.Close(); err != nil { + w.state = Corrupted + return err + } + + w.state = StreamsClosed + return nil } // Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers @@ -84,15 +104,21 @@ func (w *memoryWriter) CloseStreams() error { // Returns: // - err: Error opening IR writer func (w *memoryWriter) Reset() error { + if w.state != StreamsClosed { + return fmt.Errorf("cannot reset: writer state is %s, expected %s", w.state, StreamsClosed) + } + var err error w.zstdBuffer.Reset() w.zstdWriter.Reset(w.zstdBuffer) w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter) if err != nil { + w.state = Corrupted return err } + w.state = Open return nil } @@ -122,6 +148,14 @@ func (w *memoryWriter) GetZstdOutputSize() (int, error) { return w.zstdBuffer.Len(), nil } +// Getter for state. +// +// Returns: +// - state: Current state +func (w *memoryWriter) GetState() WriterState { + return w.state +} + // Closes [memoryWriter]. Currently used during recovery only, and advise caution using elsewhere. // Using [ir.Writer.Serializer.Close] instead of [ir.Writer.Close] so EndofStream byte is not // added. It is preferable to add postamble on recovery so that IR is in the same state diff --git a/internal/irzstd/state.go b/internal/irzstd/state.go new file mode 100644 index 0000000..448d463 --- /dev/null +++ b/internal/irzstd/state.go @@ -0,0 +1,27 @@ +package irzstd + +// WriterState is the state of a [Writer]. +type WriterState int + +const ( + // Ready to accept writes. + Open WriterState = iota + // Streams are terminated and [Writer] must be [Reset] before writing again. + StreamsClosed + // There was an unrecoverable error and writer is unusable. + Corrupted +) + +var writerStateNames = map[WriterState]string{ + Open: "Open", + StreamsClosed: "StreamsClosed", + Corrupted: "Corrupted", +} + +// Getter for string representation of [WriterState]. +// +// Returns: +// - name: String representation of the state +func (s WriterState) String() string { + return writerStateNames[s] +} diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index a38ab45..5a299bd 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -61,6 +61,12 @@ type Writer interface { // - size: Bytes written // - err GetZstdOutputSize() (int, error) + + // Get the current state of the Writer. + // + // Returns: + // - state: Current state (Open, Closed, or Corrupted) + GetState() WriterState } // Writes log events to a IR Writer. From 4f8fb02ef999f5291c54e19df2b9ee0677029043 Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 2 Feb 2026 01:56:32 +0000 Subject: [PATCH 2/7] latest --- internal/irzstd/disk.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 0a97497..8d740fe 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -285,7 +285,6 @@ func (w *diskWriter) Close() error { return nil } - // Getter for state. // // Returns: From e6e60420c838bbd0e6019bf0395578d15f38d189 Mon Sep 17 00:00:00 2001 From: davemarco <83603688+davemarco@users.noreply.github.com> Date: Thu, 19 Feb 2026 22:41:49 -0500 Subject: [PATCH 3/7] Add irTotalBytes field to memoryWriter struct --- internal/irzstd/memory.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 7b46c05..4d02d57 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -18,6 +18,7 @@ type memoryWriter struct { irWriter *ir.Writer zstdWriter *zstd.Encoder state WriterState + irTotalBytes int } // Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is From 5f2423abbd8ef62673bc6f77ce9c6563a58685cc Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 19 Feb 2026 22:54:54 -0500 Subject: [PATCH 4/7] fix lint --- internal/irzstd/memory.go | 8 ++++---- internal/irzstd/writer.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 4d02d57..c221e1a 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -14,10 +14,10 @@ import ( // Converts log events into Zstd compressed IR. Log events are immediately converted to Zstd // compressed IR and stored in [memoryWriter.zstdBuffer]. type memoryWriter struct { - zstdBuffer *bytes.Buffer - irWriter *ir.Writer - zstdWriter *zstd.Encoder - state WriterState + zstdBuffer *bytes.Buffer + irWriter *ir.Writer + zstdWriter *zstd.Encoder + state WriterState irTotalBytes int } diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index d217cfa..3eadb37 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -61,7 +61,7 @@ type Writer interface { // Returns: // - state: Current state (Open, Closed, or Corrupted) GetState() WriterState - + // Checks if writer is empty. True if no events are buffered. // // Returns: From 29f62de3da739ad43bb2a04a8622451459a9e6ca Mon Sep 17 00:00:00 2001 From: davemarco <83603688+davemarco@users.noreply.github.com> Date: Thu, 26 Feb 2026 14:12:51 -0500 Subject: [PATCH 5/7] Update internal/irzstd/writer.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- internal/irzstd/writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index 9ebcbcb..d973e41 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -57,7 +57,7 @@ type Writer interface { // Get the current state of the Writer. // // Returns: - // - state: Current state (Open, Closed, or Corrupted) + // - state: Current state (Open, StreamsClosed, or Corrupted) GetState() WriterState // Checks if writer is empty. True if no events are buffered. From d49aa40f245015af900d2c8893b8c3cdfafe273b Mon Sep 17 00:00:00 2001 From: davemarco <83603688+davemarco@users.noreply.github.com> Date: Thu, 26 Feb 2026 14:15:41 -0500 Subject: [PATCH 6/7] Fix indentation in WriteIrZstd method --- internal/irzstd/disk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index a2db5fb..d46c1a1 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -146,7 +146,7 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { if w.state != Open { return 0, fmt.Errorf("cannot write: writer state is %s, expected %s", w.state, Open) - } + } if w.irWriter == nil { var err error From 6ad7e21c74cdd64b747acd05a3dab966b9f7cb3a Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 26 Feb 2026 14:24:24 -0500 Subject: [PATCH 7/7] lint --- internal/irzstd/disk.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index d46c1a1..92d6892 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -146,8 +146,8 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { if w.state != Open { return 0, fmt.Errorf("cannot write: writer state is %s, expected %s", w.state, Open) - } - + } + if w.irWriter == nil { var err error w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile)