diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 43e2ecc..a31576d 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -52,16 +52,16 @@ type diskWriter struct { // // Returns: // - diskWriter: Disk writer for Zstd compressed IR -// - err: Error creating new buffers, error opening Zstd/IR writers +// - err: Error creating new buffers, error opening Zstd writer func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile, zstdFile, err := newFileBuffers(irPath, zstdPath) if err != nil { return nil, err } - irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, irFile) + zstdWriter, err := zstd.NewWriter(zstdFile) if err != nil { - return nil, err + return nil, fmt.Errorf("error opening Zstd writer: %w", err) } diskWriter := diskWriter{ @@ -69,7 +69,6 @@ func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile: irFile, zstdPath: zstdPath, zstdFile: zstdFile, - irWriter: irWriter, zstdWriter: zstdWriter, } @@ -106,7 +105,6 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile: irFile, zstdPath: zstdPath, zstdFile: zstdFile, - irWriter: nil, zstdWriter: zstdWriter, } @@ -131,7 +129,10 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { // Converts log events to Zstd compressed IR and outputs to the Zstd file. IR is temporarily // stored in the IR file until it surpasses [irSizeThreshold] with compression to Zstd pushed out -// to a later call. See [diskWriter] for more specific details on behaviour. +// to a later call. See [diskWriter] for more specific details on behaviour. The IR writer is lazily +// initialized on the first write. If initialized in [Reset], the preamble would make the IR file +// non-empty even though there are no logs. Non-empty IR files persist across recovery and could +// lead to empty files being uploaded to S3. // // Parameters: // - logEvents: A slice of log events to be encoded @@ -140,6 +141,14 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd, error flushing buffers func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { + if w.irWriter == nil { + var err error + w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile) + if err != nil { + return 0, fmt.Errorf("error creating IR writer: %w", err) + } + } + numBytes, numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { return numEvents, err @@ -197,18 +206,12 @@ func (w *diskWriter) CloseStreams() error { return nil } -// Reinitialize [diskWriter] after calling CloseStreams(). Resets individual IR and Zstd writers and -// associated buffers. +// Reinitialize [diskWriter] after calling CloseStreams(). Resets Zstd writer and associated +// buffer. // // Returns: -// - err: Error opening IR writer, error IR buffer not empty +// - err: Error IR buffer not empty func (w *diskWriter) Reset() error { - var err error - w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile) - if err != nil { - return err - } - // Flush should be called prior to reset, so buffer should be empty. There may be a future // use case to truncate a non-empty IR buffer; however, there is currently no use case // so safer to throw an error. @@ -216,7 +219,7 @@ func (w *diskWriter) Reset() error { return fmt.Errorf("error IR buffer is not empty") } - _, err = w.zstdFile.Seek(0, io.SeekStart) + _, err := w.zstdFile.Seek(0, io.SeekStart) if err != nil { return err } diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index a38ab45..2f2a2db 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -7,8 +7,6 @@ import ( "fmt" "io" - "github.com/klauspost/compress/zstd" - "github.com/y-scope/clp-ffi-go/ffi" "github.com/y-scope/clp-ffi-go/ir" ) @@ -87,29 +85,3 @@ func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, int, error) { } return numBytes, numEvents, nil } - -// Opens a new [ir.Writer] and [zstd.Encoder]. -// -// Parameters: -// - zstdOutput: Output destination for Zstd -// - irOutput: Output destination for IR -// -// Returns: -// - irWriter: Writer for CLP IR -// - zstdWriter: Writer for Zstd -// - err: Error opening IR/Zstd writer -func newIrZstdWriters( - zstdOutput io.Writer, - irOutput io.Writer, -) (*ir.Writer, *zstd.Encoder, error) { - zstdWriter, err := zstd.NewWriter(zstdOutput) - if err != nil { - return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) - } - - irWriter, err := ir.NewWriter[ir.FourByteEncoding](irOutput) - if err != nil { - return nil, nil, fmt.Errorf("error opening IR writer: %w", err) - } - return irWriter, zstdWriter, err -}