Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions internal/irzstd/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,23 @@ type diskWriter struct {
//
// Returns:
// - diskWriter: Disk writer for Zstd compressed IR
// - err: Error creating new buffers, error opening Zstd/IR writers
// - err: Error creating new buffers, error opening Zstd writer
func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) {
irFile, zstdFile, err := newFileBuffers(irPath, zstdPath)
if err != nil {
return nil, err
}

irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, irFile)
zstdWriter, err := zstd.NewWriter(zstdFile)
if err != nil {
return nil, err
return nil, fmt.Errorf("error opening Zstd writer: %w", err)
}

diskWriter := diskWriter{
irPath: irPath,
irFile: irFile,
zstdPath: zstdPath,
zstdFile: zstdFile,
irWriter: irWriter,
zstdWriter: zstdWriter,
}

Expand Down Expand Up @@ -106,7 +105,6 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) {
irFile: irFile,
zstdPath: zstdPath,
zstdFile: zstdFile,
irWriter: nil,
zstdWriter: zstdWriter,
}

Expand All @@ -131,7 +129,10 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) {

// Converts log events to Zstd compressed IR and outputs to the Zstd file. IR is temporarily
// stored in the IR file until it surpasses [irSizeThreshold] with compression to Zstd pushed out
// to a later call. See [diskWriter] for more specific details on behaviour.
// to a later call. See [diskWriter] for more specific details on behaviour. The IR writer is lazily
// initialized on the first write. If initialized in [Reset], the preamble would make the IR file
// non-empty even though there are no logs. Non-empty IR files persist across recovery and could
// lead to empty files being uploaded to S3.
//
// Parameters:
// - logEvents: A slice of log events to be encoded
Expand All @@ -140,6 +141,14 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) {
// - numEvents: Number of log events successfully written to IR writer buffer
// - err: Error writing IR/Zstd, error flushing buffers
func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
if w.irWriter == nil {
var err error
w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile)
if err != nil {
return 0, fmt.Errorf("error creating IR writer: %w", err)
}
}

numBytes, numEvents, err := writeIr(w.irWriter, logEvents)
if err != nil {
return numEvents, err
Expand Down Expand Up @@ -197,26 +206,20 @@ func (w *diskWriter) CloseStreams() error {
return nil
}

// Reinitialize [diskWriter] after calling CloseStreams(). Resets individual IR and Zstd writers and
// associated buffers.
// Reinitialize [diskWriter] after calling CloseStreams(). Resets Zstd writer and associated
// buffer.
//
// Returns:
// - err: Error opening IR writer, error IR buffer not empty
// - err: Error IR buffer not empty
func (w *diskWriter) Reset() error {
var err error
w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile)
if err != nil {
return err
}

// Flush should be called prior to reset, so buffer should be empty. There may be a future
// use case to truncate a non-empty IR buffer; however, there is currently no use case
// so safer to throw an error.
if w.irTotalBytes != 0 {
return fmt.Errorf("error IR buffer is not empty")
}

_, err = w.zstdFile.Seek(0, io.SeekStart)
_, err := w.zstdFile.Seek(0, io.SeekStart)
if err != nil {
return err
}
Expand Down
28 changes: 0 additions & 28 deletions internal/irzstd/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"fmt"
"io"

"github.com/klauspost/compress/zstd"

"github.com/y-scope/clp-ffi-go/ffi"
"github.com/y-scope/clp-ffi-go/ir"
)
Expand Down Expand Up @@ -87,29 +85,3 @@ func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, int, error) {
}
return numBytes, numEvents, nil
}

// Opens a new [ir.Writer] and [zstd.Encoder].
//
// Parameters:
// - zstdOutput: Output destination for Zstd
// - irOutput: Output destination for IR
//
// Returns:
// - irWriter: Writer for CLP IR
// - zstdWriter: Writer for Zstd
// - err: Error opening IR/Zstd writer
func newIrZstdWriters(
zstdOutput io.Writer,
irOutput io.Writer,
) (*ir.Writer, *zstd.Encoder, error) {
zstdWriter, err := zstd.NewWriter(zstdOutput)
if err != nil {
return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err)
}

irWriter, err := ir.NewWriter[ir.FourByteEncoding](irOutput)
if err != nil {
return nil, nil, fmt.Errorf("error opening IR writer: %w", err)
}
return irWriter, zstdWriter, err
}