Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.24

require (
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c
github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb
github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855
)

require (
Expand Down Expand Up @@ -39,6 +39,8 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sys v0.17.0 // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb h1:MAuKBGpfQIIl63810kEYZxUv8tfpI9y0nZlyi7tS8A8=
github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb/go.mod h1:EkeM7lP5AWNRcmBWt3MvjAkRx7RT0gzisW4sh+SJYUw=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855 h1:yrwcVsQs6qpCiRpCHgOk7g+jo1hBVwT9MhJA1hEQLso=
github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855/go.mod h1:EuJRZ9fcHuedhtPHsVCsB84isZkqVECbvfAfhj9JGFI=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
Expand Down
1 change: 1 addition & 0 deletions internal/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func New(data unsafe.Pointer, length int) *codec.Decoder {
mh.RawToString = true
mh.WriteExt = true
mh.ErrorIfNoArrayExpand = true
mh.MapType = reflect.TypeOf(map[string]interface{}{})

// Set up custom extension for Fluent Bit timestamp format.
mh.SetBytesExt(reflect.TypeOf(FlbTime{}), 0, &FlbTime{})
Expand Down
120 changes: 59 additions & 61 deletions internal/irzstd/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
// 2 MB threshold to buffer IR before compressing to Zstd.
const irSizeThreshold = 2 << 20

// IR end of stream byte used to terminate IR stream.
const irEndOfStreamByte = 0x0

// Converts log events into Zstd compressed IR using "trash compactor" design. Log events are
// converted to uncompressed IR and buffered into "bins". Uncompressed IR represents uncompressed
// trash in "trash compactor". Once the bin is full, the bin is "compacted" into its own separate
Expand All @@ -36,8 +39,6 @@ type diskWriter struct {
irFile *os.File
zstdFile *os.File
irWriter *ir.Writer
size int
timezone string
irTotalBytes int
zstdWriter *zstd.Encoder
}
Expand All @@ -46,33 +47,24 @@ type diskWriter struct {
// is on.
//
// Parameters:
// - timezone: Time zone of the log source
// - size: Byte length
// - irPath: Path to IR disk buffer file
// - zstdPath: Path to Zstd disk buffer file
//
// Returns:
// - diskWriter: Disk writer for Zstd compressed IR
// - err: Error creating new buffers, error opening Zstd/IR writers
func NewDiskWriter(
timezone string,
size int,
irPath string,
zstdPath string,
) (*diskWriter, error) {
func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) {
irFile, zstdFile, err := newFileBuffers(irPath, zstdPath)
if err != nil {
return nil, err
}

irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size)
irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, irFile)
if err != nil {
return nil, err
}

diskWriter := diskWriter{
size: size,
timezone: timezone,
irPath: irPath,
irFile: irFile,
zstdPath: zstdPath,
Expand All @@ -84,43 +76,37 @@ func NewDiskWriter(
return &diskWriter, nil
}

// Recovers a [diskWriter] opening buffer files from a previous execution of output plugin.
// Recovery of files necessitates that use_disk_store is on. IR preamble is removed for
// recovered store. Avoid use with empty disk stores as there will be no preamble.
// Recovers a [diskWriter] by opening buffer files from a previous execution of the output plugin.
// Requires use_disk_store to be enabled. The recovered writer must be closed with [CloseStreams]
// before it can be used for future writes, since it does not initialize an IR writer. Returns an
// error if both disk buffers are empty, since the IR would not have a preamble and would be
// invalid.
//
// Parameters:
// - timezone: Time zone of the log source
// - size: Byte length
// - irPath: Path to IR disk buffer file
// - zstdPath: Path to Zstd disk buffer file
//
// Returns:
// - diskWriter: Disk writer for Zstd compressed IR
// - err: Error opening buffers, error opening Zstd/IR writers, error getting file sizes
func RecoverWriter(
timezone string,
size int,
irPath string,
zstdPath string,
) (*diskWriter, error) {
// - err: Error opening buffers, error opening Zstd/IR writers, error getting file sizes,
// error empty buffers
func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) {
irFile, zstdFile, err := openBufferFiles(irPath, zstdPath)
if err != nil {
return nil, fmt.Errorf("error opening files: %w", err)
}

irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size)
zstdWriter, err := zstd.NewWriter(zstdFile)
if err != nil {
return nil, err
return nil, fmt.Errorf("error opening Zstd writer: %w", err)
}

diskWriter := diskWriter{
size: size,
timezone: timezone,
irPath: irPath,
irFile: irFile,
zstdPath: zstdPath,
zstdFile: zstdFile,
irWriter: irWriter,
irWriter: nil,
zstdWriter: zstdWriter,
}

Expand All @@ -129,11 +115,16 @@ func RecoverWriter(
return nil, fmt.Errorf("error getting size of IR file: %w", err)
}

// During recovery, IR buffer may not be empty, so the size must be set. In addition,
// the non-empty disk buffers already have existing preamble so remove it. Disk buffer
// must have non-zero size or else would be deleted in recover.
zstdFileSize, err := diskWriter.getZstdFileSize()
if err != nil {
return nil, fmt.Errorf("error getting size of Zstd file: %w", err)
}

if (irFileSize == 0) && (zstdFileSize == 0) {
return nil, fmt.Errorf("error both IR and Zstd buffers are empty")
}

diskWriter.irTotalBytes = irFileSize
irWriter.Reset()

return &diskWriter, nil
}
Expand All @@ -149,12 +140,7 @@ func RecoverWriter(
// - numEvents: Number of log events successfully written to IR writer buffer
// - err: Error writing IR/Zstd, error flushing buffers
func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
numEvents, err := writeIr(w.irWriter, logEvents)
if err != nil {
return numEvents, err
}

numBytes, err := w.irWriter.WriteTo(w.irFile)
numBytes, numEvents, err := writeIr(w.irWriter, logEvents)
if err != nil {
return numEvents, err
}
Expand All @@ -173,9 +159,12 @@ func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
return numEvents, nil
}

// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding.
// The IR buffer is also flushed before ending stream. After calling close,
// [diskWriter] must be reset prior to calling write.
// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. The IR
// buffer is also flushed before ending stream. After calling close, [diskWriter] must be reset
// prior to calling write. For recovered [diskWriter], [ir.Writer] will be nil so closing the
// IR writer is skipped. The IR trailing byte is written directly to [zstdWriter] as an
// optimization to avoid an extra flush when the IR buffer is empty. [flushIrBuffer] exits early
// if the IR buffer is empty.
//
// Returns:
// - err: Error flushing/closing buffers
Expand All @@ -186,13 +175,15 @@ func (w *diskWriter) CloseStreams() error {
return fmt.Errorf("error flushing IR buffer: %w", err)
}

_, err = w.irWriter.CloseTo(w.zstdWriter)
if err != nil {
return err
if w.irWriter != nil {
err := w.irWriter.Serializer.Close()
if err != nil {
return fmt.Errorf("error could not close irWriter: %w", err)
}
w.irWriter = nil
}

w.irWriter = nil

w.zstdWriter.Write([]byte{irEndOfStreamByte})
err = w.zstdWriter.Close()
if err != nil {
return err
Expand All @@ -213,7 +204,7 @@ func (w *diskWriter) CloseStreams() error {
// - err: Error opening IR writer, error IR buffer not empty
func (w *diskWriter) Reset() error {
var err error
w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone)
w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile)
if err != nil {
return err
}
Expand Down Expand Up @@ -285,21 +276,12 @@ func (w *diskWriter) GetZstdOutput() io.Reader {
return w.zstdFile
}

// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write.
// Therefore, cannot keep track of size with variable as implemented for IR with [IrTotalBytes].
// Instead, must always use stat.
// Get size of Zstd output.
//
// Returns:
// - err: Error calling stat
// - err: Error getting size
func (w *diskWriter) GetZstdOutputSize() (int, error) {
zstdFileInfo, err := w.zstdFile.Stat()
if err != nil {
return 0, err
}

zstdFileSize := int(zstdFileInfo.Size())

return zstdFileSize, err
return w.getZstdFileSize()
}

// Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then
Expand Down Expand Up @@ -451,3 +433,19 @@ func (w *diskWriter) getIrFileSize() (int, error) {
irFileSize := int(irFileInfo.Size())
return irFileSize, err
}

// Get size of Zstd file. [zstd] does not provide the amount of bytes written with each write.
// Therefore, cannot keep track of size with variable as implemented for IR with [irTotalBytes].
// Instead, must always use stat.
//
// Returns:
// - err: Error calling stat
func (w *diskWriter) getZstdFileSize() (int, error) {
zstdFileInfo, err := w.zstdFile.Stat()
if err != nil {
return 0, err
}

zstdFileSize := int(zstdFileInfo.Size())
return zstdFileSize, err
}
38 changes: 16 additions & 22 deletions internal/irzstd/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,29 @@ import (
type memoryWriter struct {
zstdBuffer *bytes.Buffer
irWriter *ir.Writer
size int
timezone string
zstdWriter *zstd.Encoder
}

// Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is
// off.
//
// Parameters:
// - timezone: Time zone of the log source
// - size: Byte length
//
// Returns:
// - memoryWriter: Memory writer for Zstd compressed IR
// - err: Error opening Zstd/IR writers
func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) {
func NewMemoryWriter() (*memoryWriter, error) {
var zstdBuffer bytes.Buffer
irWriter, zstdWriter, err := newIrZstdWriters(&zstdBuffer, timezone, size)

zstdWriter, err := zstd.NewWriter(&zstdBuffer)
if err != nil {
return nil, err
return nil, fmt.Errorf("error opening Zstd writer: %w", err)
}

irWriter, err := ir.NewWriter[ir.FourByteEncoding](zstdWriter)
if err != nil {
return nil, fmt.Errorf("error opening IR writer: %w", err)
}

memoryWriter := memoryWriter{
size: size,
timezone: timezone,
irWriter: irWriter,
zstdWriter: zstdWriter,
zstdBuffer: &zstdBuffer,
Expand All @@ -59,12 +57,10 @@ func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) {
// - numEvents: Number of log events successfully written to IR writer buffer
// - err: Error writing IR/Zstd
func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
numEvents, err := writeIr(w.irWriter, logEvents)
_, numEvents, err := writeIr(w.irWriter, logEvents)
if err != nil {
return numEvents, err
}

_, err = w.irWriter.WriteTo(w.zstdWriter)
return numEvents, err
}

Expand All @@ -74,15 +70,12 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
// Returns:
// - err: Error closing buffers
func (w *memoryWriter) CloseStreams() error {
_, err := w.irWriter.CloseTo(w.zstdWriter)
if err != nil {
if err := w.irWriter.Close(); err != nil {
return err
}

w.irWriter = nil

err = w.zstdWriter.Close()
return err
return w.zstdWriter.Close()
}

// Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers
Expand All @@ -92,13 +85,14 @@ func (w *memoryWriter) CloseStreams() error {
// - err: Error opening IR writer
func (w *memoryWriter) Reset() error {
var err error
w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone)
w.zstdBuffer.Reset()
w.zstdWriter.Reset(w.zstdBuffer)

w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter)
if err != nil {
return err
}

w.zstdBuffer.Reset()
w.zstdWriter.Reset(w.zstdBuffer)
return nil
}

Expand Down
Loading