From fff3c95284a4f2fb7cfe3d21ac156d4aed159abf Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 16 Jan 2026 15:08:03 -0500 Subject: [PATCH 01/31] latest --- go.mod | 4 +- go.sum | 8 ++- internal/irzstd/disk.go | 34 ++++++------ internal/irzstd/memory.go | 31 +++++------ internal/irzstd/writer.go | 28 +++++----- plugins/out_clp_s3/internal/flush/flush.go | 61 ++++------------------ 6 files changed, 67 insertions(+), 99 deletions(-) diff --git a/go.mod b/go.mod index b2c50f9..f774fb0 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22.3 require ( github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c - github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb + github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855 ) require ( @@ -39,6 +39,8 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/crypto v0.19.0 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.17.0 // indirect diff --git a/go.sum b/go.sum index 8624661..cccd671 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb h1:MAuKBGpfQIIl63810kEYZxUv8tfpI9y0nZlyi7tS8A8= -github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb/go.mod h1:EkeM7lP5AWNRcmBWt3MvjAkRx7RT0gzisW4sh+SJYUw= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855 h1:yrwcVsQs6qpCiRpCHgOk7g+jo1hBVwT9MhJA1hEQLso= +github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855/go.mod h1:EuJRZ9fcHuedhtPHsVCsB84isZkqVECbvfAfhj9JGFI= golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 0d1cf59..3031b65 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -65,7 +65,7 @@ func NewDiskWriter( return nil, err } - irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size) + irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, irFile) if err != nil { return nil, err } @@ -108,9 +108,9 @@ func RecoverWriter( return nil, fmt.Errorf("error opening files: %w", err) } - irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size) + zstdWriter, err := zstd.NewWriter(zstdFile) if err != nil { - return nil, err + return nil, fmt.Errorf("error opening Zstd writer: %w", err) } diskWriter := diskWriter{ @@ -120,7 +120,7 @@ func RecoverWriter( irFile: irFile, zstdPath: zstdPath, zstdFile: zstdFile, - irWriter: irWriter, + irWriter: nil, zstdWriter: zstdWriter, } @@ -133,7 +133,6 @@ func RecoverWriter( // the non-empty disk buffers already have existing preamble so remove it. Disk buffer // must have non-zero size or else would be deleted in recover. diskWriter.irTotalBytes = irFileSize - irWriter.Reset() return &diskWriter, nil } @@ -149,12 +148,7 @@ func RecoverWriter( // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd, error flushing buffers func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { - numEvents, err := writeIr(w.irWriter, logEvents) - if err != nil { - return numEvents, err - } - - numBytes, err := w.irWriter.WriteTo(w.irFile) + numBytes, numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { return numEvents, err } @@ -186,13 +180,17 @@ func (w *diskWriter) CloseStreams() error { return fmt.Errorf("error flushing IR buffer: %w", err) } - _, err = w.irWriter.CloseTo(w.zstdWriter) - if err != nil { - return err - } - - w.irWriter = nil + // may be nil if recovered. + if (w.irWriter == nil) { + err := w.irWriter.Serializer.Close() + if err != nil { + return fmt.Errorf("error could not close irWriter: %w", err) + } + w.irWriter = nil + } + // ir termination byte + w.zstdWriter.Write([]byte{0x0}) err = w.zstdWriter.Close() if err != nil { return err @@ -213,7 +211,7 @@ func (w *diskWriter) CloseStreams() error { // - err: Error opening IR writer, error IR buffer not empty func (w *diskWriter) Reset() error { var err error - w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) + w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile) if err != nil { return err } diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 5d8816b..b64a88d 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -6,7 +6,6 @@ import ( "io" "github.com/klauspost/compress/zstd" - "github.com/y-scope/clp-ffi-go/ffi" "github.com/y-scope/clp-ffi-go/ir" ) @@ -34,15 +33,21 @@ type memoryWriter struct { // - err: Error opening Zstd/IR writers func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { var zstdBuffer bytes.Buffer - irWriter, zstdWriter, err := newIrZstdWriters(&zstdBuffer, timezone, size) + + zstdWriter, err := zstd.NewWriter(&zstdBuffer) if err != nil { - return nil, err + return nil, fmt.Errorf("error opening Zstd writer: %w", err) + } + + irWriter, err := ir.NewWriter[ir.FourByteEncoding](zstdWriter) + if err != nil { + return nil, fmt.Errorf("error opening IR writer: %w", err) } memoryWriter := memoryWriter{ size: size, timezone: timezone, - irWriter: irWriter, + irWriter: irWriter, zstdWriter: zstdWriter, zstdBuffer: &zstdBuffer, } @@ -59,12 +64,10 @@ func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { - numEvents, err := writeIr(w.irWriter, logEvents) + _, numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { return numEvents, err } - - _, err = w.irWriter.WriteTo(w.zstdWriter) return numEvents, err } @@ -74,15 +77,12 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { // Returns: // - err: Error closing buffers func (w *memoryWriter) CloseStreams() error { - _, err := w.irWriter.CloseTo(w.zstdWriter) - if err != nil { + if err := w.irWriter.Close(); err != nil { return err } - w.irWriter = nil - err = w.zstdWriter.Close() - return err + return w.zstdWriter.Close() } // Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers @@ -92,13 +92,14 @@ func (w *memoryWriter) CloseStreams() error { // - err: Error opening IR writer func (w *memoryWriter) Reset() error { var err error - w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) + w.zstdBuffer.Reset() + w.zstdWriter.Reset(w.zstdBuffer) + + w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter) if err != nil { return err } - w.zstdBuffer.Reset() - w.zstdWriter.Reset(w.zstdBuffer) return nil } diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index eb43758..862a91d 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -70,19 +70,22 @@ type Writer interface { // - logEvents: A slice of log events to be encoded // // Returns: +// - numBytes: Total IR bytes written for the batch // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error if an event could not be written -func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, error) { +func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, int, error) { var numEvents int + var numBytes int for _, event := range logEvents { - _, err := irWriter.Write(event) + n, err := irWriter.WriteLogEvent(event) + numBytes += n if err != nil { err = fmt.Errorf("failed to encode event %v into ir: %w", event, err) - return numEvents, err + return numBytes, numEvents, err } numEvents += 1 } - return numEvents, nil + return numBytes, numEvents, nil } // Opens a new [ir.Writer] and [zstd.Encoder]. @@ -98,19 +101,20 @@ func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, error) { // - err: Error opening IR/Zstd writer func newIrZstdWriters( zstdOutput io.Writer, - timezone string, - size int, + irOutput io.Writer, ) (*ir.Writer, *zstd.Encoder, error) { - // IR buffer using bytes.Buffer internally, so it will dynamically grow if undersized. Using - // FourByteEncoding as default encoding. - irWriter, err := ir.NewWriterSize[ir.FourByteEncoding](size, timezone) + zstdWriter, err := zstd.NewWriter(zstdOutput) if err != nil { - return nil, nil, fmt.Errorf("error opening IR writer: %w", err) + return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) } - zstdWriter, err := zstd.NewWriter(zstdOutput) + if irOutput == nil { + irOutput = zstdWriter + } + + irWriter, err := ir.NewWriter[ir.FourByteEncoding](irOutput) if err != nil { - return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) + return nil, nil, fmt.Errorf("error opening IR writer: %w", err) } return irWriter, zstdWriter, err } diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 714be4f..e000d02 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -4,7 +4,6 @@ package flush import ( - "C" "encoding/json" "fmt" "io" @@ -91,21 +90,23 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, error) { var logEvents []ffi.LogEvent for { - ts, record, err := decoder.GetRecord(dec) + // We are discarding fluent bit timestamp since users likely have their own timestamp. It may make sense to + // add it as an auto generated field as an optional setting. For most users it will + // just decrease compression ratio and not be used. + _, jsonRecord, err := decoder.GetRecord(dec) if err != nil { return logEvents, err } - timestamp := decodeTs(ts) - msg, err := getMessage(record, config) + var autoKvPairs map[string]any + var userKvPairs map[string]any + err = json.Unmarshal(jsonRecord, &userKvPairs) if err != nil { - err = fmt.Errorf("failed to get message from record: %w", err) - return nil, err + return nil, fmt.Errorf("failed to unmarshal json record %v: %w", jsonRecord, err) } - event := ffi.LogEvent{ - LogMessage: msg, - Timestamp: ffi.EpochTimeMs(timestamp.UnixMilli()), + AutoKvPairs: autoKvPairs, + UserKvPairs: userKvPairs, } logEvents = append(logEvents, event) } @@ -133,48 +134,6 @@ func decodeTs(ts any) time.Time { return timestamp } -// Retrieves message from a record object. The message can consist of the entire object or -// just a single key. For a single key, user should set use_single_key to true in fluent-bit.conf. -// In addition user, should set single_key to "log" which is default Fluent Bit key for unparsed -// messages; however, single_key can be set to another value. To prevent failure if the key is -// missing, user can specify allow_missing_key, and behaviour will fallback to the entire object. -// -// Parameters: -// - record: JSON record from Fluent Bit with variable amount of keys -// - config: Plugin configuration -// -// Returns: -// - stringMsg: Retrieved message -// - err: Key not found, json.Unmarshal error, string type assertion error -func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { - if !config.UseSingleKey { - return string(jsonRecord), nil - } - - var record map[string]any - err := json.Unmarshal(jsonRecord, &record) - if err != nil { - return "", fmt.Errorf("failed to unmarshal json record %v: %w", jsonRecord, err) - } - - singleKeyMsg, ok := record[config.SingleKey] - if !ok { - // If key not found in record, see if allow_missing_key=true. If missing key is - // allowed, then return entire record. - if config.AllowMissingKey { - return string(jsonRecord), nil - } - return "", fmt.Errorf("key %s not found in record %v", config.SingleKey, record) - } - - stringMsg, ok := singleKeyMsg.(string) - if !ok { - return "", fmt.Errorf("string type assertion for message failed %v", singleKeyMsg) - } - - return stringMsg, nil -} - // Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always // uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater // than upload size. From 9abc3bfede497b08766f5e4c1c9327365d432480 Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 10:33:48 -0500 Subject: [PATCH 02/31] latest --- internal/irzstd/disk.go | 11 +++++++---- internal/irzstd/memory.go | 3 ++- internal/irzstd/writer.go | 4 ---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 3031b65..bb37eda 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -85,8 +85,9 @@ func NewDiskWriter( } // Recovers a [diskWriter] opening buffer files from a previous execution of output plugin. -// Recovery of files necessitates that use_disk_store is on. IR preamble is removed for -// recovered store. Avoid use with empty disk stores as there will be no preamble. +// Recovery of files necessitates that use_disk_store is on. Recovered store must be closed +// with close streams() before can be used for future writting since it does not init with +// an IR writer. Avoid use with empty disk stores. // // Parameters: // - timezone: Time zone of the log source @@ -120,6 +121,7 @@ func RecoverWriter( irFile: irFile, zstdPath: zstdPath, zstdFile: zstdFile, + // Recovered streams should already have IR, so irWriter is nil until flushed/reset. irWriter: nil, zstdWriter: zstdWriter, } @@ -180,7 +182,7 @@ func (w *diskWriter) CloseStreams() error { return fmt.Errorf("error flushing IR buffer: %w", err) } - // may be nil if recovered. + // irWriter will be nil for recoveredWriter until flushed/reset at least once. if (w.irWriter == nil) { err := w.irWriter.Serializer.Close() if err != nil { @@ -189,7 +191,8 @@ func (w *diskWriter) CloseStreams() error { w.irWriter = nil } - // ir termination byte + // Add IR postamble byte manually. We cannot use [ir.Writer.Close] since it add + // postable to the irFile, which was already flushed, and not the zstdFile. w.zstdWriter.Write([]byte{0x0}) err = w.zstdWriter.Close() if err != nil { diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index b64a88d..3c805a7 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -6,6 +6,7 @@ import ( "io" "github.com/klauspost/compress/zstd" + "github.com/y-scope/clp-ffi-go/ffi" "github.com/y-scope/clp-ffi-go/ir" ) @@ -47,7 +48,7 @@ func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { memoryWriter := memoryWriter{ size: size, timezone: timezone, - irWriter: irWriter, + irWriter: irWriter, zstdWriter: zstdWriter, zstdBuffer: &zstdBuffer, } diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index 862a91d..f97255e 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -108,10 +108,6 @@ func newIrZstdWriters( return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) } - if irOutput == nil { - irOutput = zstdWriter - } - irWriter, err := ir.NewWriter[ir.FourByteEncoding](irOutput) if err != nil { return nil, nil, fmt.Errorf("error opening IR writer: %w", err) From b7300e80cad032423c29535050aa507b70b700ed Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 13:06:28 -0500 Subject: [PATCH 03/31] latest --- .golangci.yml | 110 +++++++++++---------- go.mod | 6 +- go.sum | 8 +- internal/irzstd/disk.go | 41 ++++---- internal/irzstd/memory.go | 28 +++--- internal/irzstd/writer.go | 24 ++--- plugins/out_clp_s3/internal/flush/flush.go | 61 ++++++++++-- 7 files changed, 160 insertions(+), 118 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index a353528..ddbc506 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,54 +1,64 @@ -output: - sort-results: true - +version: "2" +linters: + default: none + enable: + - gocritic + - govet + - ineffassign + - nakedret + - revive + - staticcheck + - unused + settings: + gocritic: + disabled-checks: + - dupSubExpr + nakedret: + max-func-lines: 0 + revive: + rules: + - name: line-length-limit + arguments: + - 100 + severity: warning + disabled: false + staticcheck: + checks: + - all + - -ST1020 + - -ST1021 + - -ST1022 + - -ST1023 + initialisms: + - "" + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + rules: + - linters: + - revive + source: '^\s*// (?:\[.+\]: )?https?://.+' + paths: + - third_party$ + - builtin$ + - examples$ issues: max-issues-per-linter: 0 max-same-issues: 0 - exclude-rules: - - linters: - - revive - source: "^\\s*// (?:\\[.+\\]: )?https?://.+" - -linters: - disable-all: true +formatters: enable: - - "gocritic" - - "goimports" - - "govet" - - "ineffassign" - - "nakedret" - - "revive" - - "staticcheck" - - "stylecheck" - - "unused" - -linters-settings: - gocritic: - disabled-checks: - # Appears to cause a false positive for Cgo calls - - "dupSubExpr" - goimports: - # Put imports beginning with prefix after 3rd-party packages. - local-prefixes: "github.com/y-scope/fluent-bit-clp" - nakedret: - # Completely disallow naked returns - max-func-lines: 0 - revive: - rules: - - name: "line-length-limit" - severity: "warning" - disabled: false - arguments: [100] - staticcheck: - checks: - - "all" - stylecheck: - checks: - - "all" - # Documentation guidelines that we don't follow (non-default) - - "-ST1020" - - "-ST1021" - - "-ST1022" - # Redundant variable declaration (non-default) - - "-ST1023" - initialisms: [""] + - goimports + settings: + goimports: + local-prefixes: + - github.com/y-scope/fluent-bit-clp + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/go.mod b/go.mod index f774fb0..0eee096 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,10 @@ module github.com/y-scope/fluent-bit-clp -go 1.22.3 +go 1.25.6 require ( github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c - github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855 + github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb ) require ( @@ -39,8 +39,6 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect - github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect - github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/crypto v0.19.0 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.17.0 // indirect diff --git a/go.sum b/go.sum index cccd671..8624661 100644 --- a/go.sum +++ b/go.sum @@ -69,12 +69,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= -github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= -github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= -github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= -github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855 h1:yrwcVsQs6qpCiRpCHgOk7g+jo1hBVwT9MhJA1hEQLso= -github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855/go.mod h1:EuJRZ9fcHuedhtPHsVCsB84isZkqVECbvfAfhj9JGFI= +github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb h1:MAuKBGpfQIIl63810kEYZxUv8tfpI9y0nZlyi7tS8A8= +github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb/go.mod h1:EkeM7lP5AWNRcmBWt3MvjAkRx7RT0gzisW4sh+SJYUw= golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index bb37eda..0d1cf59 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -65,7 +65,7 @@ func NewDiskWriter( return nil, err } - irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, irFile) + irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size) if err != nil { return nil, err } @@ -85,9 +85,8 @@ func NewDiskWriter( } // Recovers a [diskWriter] opening buffer files from a previous execution of output plugin. -// Recovery of files necessitates that use_disk_store is on. Recovered store must be closed -// with close streams() before can be used for future writting since it does not init with -// an IR writer. Avoid use with empty disk stores. +// Recovery of files necessitates that use_disk_store is on. IR preamble is removed for +// recovered store. Avoid use with empty disk stores as there will be no preamble. // // Parameters: // - timezone: Time zone of the log source @@ -109,9 +108,9 @@ func RecoverWriter( return nil, fmt.Errorf("error opening files: %w", err) } - zstdWriter, err := zstd.NewWriter(zstdFile) + irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size) if err != nil { - return nil, fmt.Errorf("error opening Zstd writer: %w", err) + return nil, err } diskWriter := diskWriter{ @@ -121,8 +120,7 @@ func RecoverWriter( irFile: irFile, zstdPath: zstdPath, zstdFile: zstdFile, - // Recovered streams should already have IR, so irWriter is nil until flushed/reset. - irWriter: nil, + irWriter: irWriter, zstdWriter: zstdWriter, } @@ -135,6 +133,7 @@ func RecoverWriter( // the non-empty disk buffers already have existing preamble so remove it. Disk buffer // must have non-zero size or else would be deleted in recover. diskWriter.irTotalBytes = irFileSize + irWriter.Reset() return &diskWriter, nil } @@ -150,7 +149,12 @@ func RecoverWriter( // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd, error flushing buffers func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { - numBytes, numEvents, err := writeIr(w.irWriter, logEvents) + numEvents, err := writeIr(w.irWriter, logEvents) + if err != nil { + return numEvents, err + } + + numBytes, err := w.irWriter.WriteTo(w.irFile) if err != nil { return numEvents, err } @@ -182,18 +186,13 @@ func (w *diskWriter) CloseStreams() error { return fmt.Errorf("error flushing IR buffer: %w", err) } - // irWriter will be nil for recoveredWriter until flushed/reset at least once. - if (w.irWriter == nil) { - err := w.irWriter.Serializer.Close() - if err != nil { - return fmt.Errorf("error could not close irWriter: %w", err) - } - w.irWriter = nil - } + _, err = w.irWriter.CloseTo(w.zstdWriter) + if err != nil { + return err + } + + w.irWriter = nil - // Add IR postamble byte manually. We cannot use [ir.Writer.Close] since it add - // postable to the irFile, which was already flushed, and not the zstdFile. - w.zstdWriter.Write([]byte{0x0}) err = w.zstdWriter.Close() if err != nil { return err @@ -214,7 +213,7 @@ func (w *diskWriter) CloseStreams() error { // - err: Error opening IR writer, error IR buffer not empty func (w *diskWriter) Reset() error { var err error - w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile) + w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) if err != nil { return err } diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 3c805a7..5d8816b 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -34,15 +34,9 @@ type memoryWriter struct { // - err: Error opening Zstd/IR writers func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { var zstdBuffer bytes.Buffer - - zstdWriter, err := zstd.NewWriter(&zstdBuffer) - if err != nil { - return nil, fmt.Errorf("error opening Zstd writer: %w", err) - } - - irWriter, err := ir.NewWriter[ir.FourByteEncoding](zstdWriter) + irWriter, zstdWriter, err := newIrZstdWriters(&zstdBuffer, timezone, size) if err != nil { - return nil, fmt.Errorf("error opening IR writer: %w", err) + return nil, err } memoryWriter := memoryWriter{ @@ -65,10 +59,12 @@ func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { - _, numEvents, err := writeIr(w.irWriter, logEvents) + numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { return numEvents, err } + + _, err = w.irWriter.WriteTo(w.zstdWriter) return numEvents, err } @@ -78,12 +74,15 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { // Returns: // - err: Error closing buffers func (w *memoryWriter) CloseStreams() error { - if err := w.irWriter.Close(); err != nil { + _, err := w.irWriter.CloseTo(w.zstdWriter) + if err != nil { return err } + w.irWriter = nil - return w.zstdWriter.Close() + err = w.zstdWriter.Close() + return err } // Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers @@ -93,14 +92,13 @@ func (w *memoryWriter) CloseStreams() error { // - err: Error opening IR writer func (w *memoryWriter) Reset() error { var err error - w.zstdBuffer.Reset() - w.zstdWriter.Reset(w.zstdBuffer) - - w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter) + w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) if err != nil { return err } + w.zstdBuffer.Reset() + w.zstdWriter.Reset(w.zstdBuffer) return nil } diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index f97255e..eb43758 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -70,22 +70,19 @@ type Writer interface { // - logEvents: A slice of log events to be encoded // // Returns: -// - numBytes: Total IR bytes written for the batch // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error if an event could not be written -func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, int, error) { +func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, error) { var numEvents int - var numBytes int for _, event := range logEvents { - n, err := irWriter.WriteLogEvent(event) - numBytes += n + _, err := irWriter.Write(event) if err != nil { err = fmt.Errorf("failed to encode event %v into ir: %w", event, err) - return numBytes, numEvents, err + return numEvents, err } numEvents += 1 } - return numBytes, numEvents, nil + return numEvents, nil } // Opens a new [ir.Writer] and [zstd.Encoder]. @@ -101,16 +98,19 @@ func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, int, error) { // - err: Error opening IR/Zstd writer func newIrZstdWriters( zstdOutput io.Writer, - irOutput io.Writer, + timezone string, + size int, ) (*ir.Writer, *zstd.Encoder, error) { - zstdWriter, err := zstd.NewWriter(zstdOutput) + // IR buffer using bytes.Buffer internally, so it will dynamically grow if undersized. Using + // FourByteEncoding as default encoding. + irWriter, err := ir.NewWriterSize[ir.FourByteEncoding](size, timezone) if err != nil { - return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) + return nil, nil, fmt.Errorf("error opening IR writer: %w", err) } - irWriter, err := ir.NewWriter[ir.FourByteEncoding](irOutput) + zstdWriter, err := zstd.NewWriter(zstdOutput) if err != nil { - return nil, nil, fmt.Errorf("error opening IR writer: %w", err) + return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) } return irWriter, zstdWriter, err } diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index e000d02..714be4f 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -4,6 +4,7 @@ package flush import ( + "C" "encoding/json" "fmt" "io" @@ -90,23 +91,21 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, error) { var logEvents []ffi.LogEvent for { - // We are discarding fluent bit timestamp since users likely have their own timestamp. It may make sense to - // add it as an auto generated field as an optional setting. For most users it will - // just decrease compression ratio and not be used. - _, jsonRecord, err := decoder.GetRecord(dec) + ts, record, err := decoder.GetRecord(dec) if err != nil { return logEvents, err } - var autoKvPairs map[string]any - var userKvPairs map[string]any - err = json.Unmarshal(jsonRecord, &userKvPairs) + timestamp := decodeTs(ts) + msg, err := getMessage(record, config) if err != nil { - return nil, fmt.Errorf("failed to unmarshal json record %v: %w", jsonRecord, err) + err = fmt.Errorf("failed to get message from record: %w", err) + return nil, err } + event := ffi.LogEvent{ - AutoKvPairs: autoKvPairs, - UserKvPairs: userKvPairs, + LogMessage: msg, + Timestamp: ffi.EpochTimeMs(timestamp.UnixMilli()), } logEvents = append(logEvents, event) } @@ -134,6 +133,48 @@ func decodeTs(ts any) time.Time { return timestamp } +// Retrieves message from a record object. The message can consist of the entire object or +// just a single key. For a single key, user should set use_single_key to true in fluent-bit.conf. +// In addition user, should set single_key to "log" which is default Fluent Bit key for unparsed +// messages; however, single_key can be set to another value. To prevent failure if the key is +// missing, user can specify allow_missing_key, and behaviour will fallback to the entire object. +// +// Parameters: +// - record: JSON record from Fluent Bit with variable amount of keys +// - config: Plugin configuration +// +// Returns: +// - stringMsg: Retrieved message +// - err: Key not found, json.Unmarshal error, string type assertion error +func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { + if !config.UseSingleKey { + return string(jsonRecord), nil + } + + var record map[string]any + err := json.Unmarshal(jsonRecord, &record) + if err != nil { + return "", fmt.Errorf("failed to unmarshal json record %v: %w", jsonRecord, err) + } + + singleKeyMsg, ok := record[config.SingleKey] + if !ok { + // If key not found in record, see if allow_missing_key=true. If missing key is + // allowed, then return entire record. + if config.AllowMissingKey { + return string(jsonRecord), nil + } + return "", fmt.Errorf("key %s not found in record %v", config.SingleKey, record) + } + + stringMsg, ok := singleKeyMsg.(string) + if !ok { + return "", fmt.Errorf("string type assertion for message failed %v", singleKeyMsg) + } + + return stringMsg, nil +} + // Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always // uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater // than upload size. From 960258d44d89c2b78b0de31843136838012172a6 Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 18:18:02 +0000 Subject: [PATCH 04/31] latest --- .github/workflows/lint.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index d5a8e4e..582f6b0 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -9,7 +9,7 @@ jobs: go-lint: strategy: matrix: - go: ["1.22"] + go: ["1.25.6"] os: ["ubuntu-latest", "macos-latest"] runs-on: "${{ matrix.os }}" steps: @@ -21,4 +21,4 @@ jobs: - uses: "golangci/golangci-lint-action@v6" with: - version: "v1.59" + version: "v9.00" From bf688b0b317c0a689a223cb626e9bb0231b9f33f Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 18:20:05 +0000 Subject: [PATCH 05/31] latest --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 32856b7..5c45e4b 100644 --- a/README.md +++ b/README.md @@ -56,8 +56,8 @@ another output. 1. Install golangci-lint: ```shell -curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | \ - sh -s -- -b $(go env GOPATH)/bin v1.59.0 +curl -sSfL https://golangci-lint.run/install.sh | \ + sh -s -- -b $(go env GOPATH)/bin v2.8.0 ``` 2. Run with `golangci-lint run` From 22e1166c12a13755511f521142be6bb00e6d759c Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 18:22:06 +0000 Subject: [PATCH 06/31] latest --- .github/workflows/lint.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 582f6b0..da61625 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -13,12 +13,12 @@ jobs: os: ["ubuntu-latest", "macos-latest"] runs-on: "${{ matrix.os }}" steps: - - uses: "actions/checkout@v4" + - uses: "actions/checkout@v6" - - uses: "actions/setup-go@v5" + - uses: "actions/setup-go@v6" with: go-version: "${{ matrix.go }}" - - uses: "golangci/golangci-lint-action@v6" + - uses: "golangci/golangci-lint-action@v9" with: - version: "v9.00" + version: "v2.8" From 2a1bea79f4e6ab538a188ac1ab17e0bb6a732570 Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 18:51:48 +0000 Subject: [PATCH 07/31] update linter --- .github/workflows/lint.yml | 10 ++-- .golangci.yml | 110 ++++++++++++++++++++----------------- README.md | 4 +- 3 files changed, 67 insertions(+), 57 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index d5a8e4e..da61625 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -9,16 +9,16 @@ jobs: go-lint: strategy: matrix: - go: ["1.22"] + go: ["1.25.6"] os: ["ubuntu-latest", "macos-latest"] runs-on: "${{ matrix.os }}" steps: - - uses: "actions/checkout@v4" + - uses: "actions/checkout@v6" - - uses: "actions/setup-go@v5" + - uses: "actions/setup-go@v6" with: go-version: "${{ matrix.go }}" - - uses: "golangci/golangci-lint-action@v6" + - uses: "golangci/golangci-lint-action@v9" with: - version: "v1.59" + version: "v2.8" diff --git a/.golangci.yml b/.golangci.yml index a353528..ddbc506 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,54 +1,64 @@ -output: - sort-results: true - +version: "2" +linters: + default: none + enable: + - gocritic + - govet + - ineffassign + - nakedret + - revive + - staticcheck + - unused + settings: + gocritic: + disabled-checks: + - dupSubExpr + nakedret: + max-func-lines: 0 + revive: + rules: + - name: line-length-limit + arguments: + - 100 + severity: warning + disabled: false + staticcheck: + checks: + - all + - -ST1020 + - -ST1021 + - -ST1022 + - -ST1023 + initialisms: + - "" + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + rules: + - linters: + - revive + source: '^\s*// (?:\[.+\]: )?https?://.+' + paths: + - third_party$ + - builtin$ + - examples$ issues: max-issues-per-linter: 0 max-same-issues: 0 - exclude-rules: - - linters: - - revive - source: "^\\s*// (?:\\[.+\\]: )?https?://.+" - -linters: - disable-all: true +formatters: enable: - - "gocritic" - - "goimports" - - "govet" - - "ineffassign" - - "nakedret" - - "revive" - - "staticcheck" - - "stylecheck" - - "unused" - -linters-settings: - gocritic: - disabled-checks: - # Appears to cause a false positive for Cgo calls - - "dupSubExpr" - goimports: - # Put imports beginning with prefix after 3rd-party packages. - local-prefixes: "github.com/y-scope/fluent-bit-clp" - nakedret: - # Completely disallow naked returns - max-func-lines: 0 - revive: - rules: - - name: "line-length-limit" - severity: "warning" - disabled: false - arguments: [100] - staticcheck: - checks: - - "all" - stylecheck: - checks: - - "all" - # Documentation guidelines that we don't follow (non-default) - - "-ST1020" - - "-ST1021" - - "-ST1022" - # Redundant variable declaration (non-default) - - "-ST1023" - initialisms: [""] + - goimports + settings: + goimports: + local-prefixes: + - github.com/y-scope/fluent-bit-clp + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/README.md b/README.md index 32856b7..5c45e4b 100644 --- a/README.md +++ b/README.md @@ -56,8 +56,8 @@ another output. 1. Install golangci-lint: ```shell -curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | \ - sh -s -- -b $(go env GOPATH)/bin v1.59.0 +curl -sSfL https://golangci-lint.run/install.sh | \ + sh -s -- -b $(go env GOPATH)/bin v2.8.0 ``` 2. Run with `golangci-lint run` From 73e035a32e31d64d5427f16dbf4c2eac7e04f31b Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 19:09:05 +0000 Subject: [PATCH 08/31] latest --- go.mod | 2 +- internal/irzstd/disk.go | 16 +++++------ plugins/out_clp_s3/internal/flush/flush.go | 31 +++------------------- 3 files changed, 13 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index f774fb0..4bbb5ab 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/y-scope/fluent-bit-clp -go 1.22.3 +go 1.25.6 require ( github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index bb37eda..f42287d 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -115,12 +115,12 @@ func RecoverWriter( } diskWriter := diskWriter{ - size: size, - timezone: timezone, - irPath: irPath, - irFile: irFile, - zstdPath: zstdPath, - zstdFile: zstdFile, + size: size, + timezone: timezone, + irPath: irPath, + irFile: irFile, + zstdPath: zstdPath, + zstdFile: zstdFile, // Recovered streams should already have IR, so irWriter is nil until flushed/reset. irWriter: nil, zstdWriter: zstdWriter, @@ -183,13 +183,13 @@ func (w *diskWriter) CloseStreams() error { } // irWriter will be nil for recoveredWriter until flushed/reset at least once. - if (w.irWriter == nil) { + if w.irWriter == nil { err := w.irWriter.Serializer.Close() if err != nil { return fmt.Errorf("error could not close irWriter: %w", err) } w.irWriter = nil - } + } // Add IR postamble byte manually. We cannot use [ir.Writer.Close] since it add // postable to the irFile, which was already flushed, and not the zstdFile. diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index e000d02..13e92ae 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "log" - "time" "unsafe" "github.com/fluent/fluent-bit-go/output" @@ -90,9 +89,9 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, error) { var logEvents []ffi.LogEvent for { - // We are discarding fluent bit timestamp since users likely have their own timestamp. It may make sense to - // add it as an auto generated field as an optional setting. For most users it will - // just decrease compression ratio and not be used. + // Discarding Fluent Bit timestamp since log records typically contain their own timestamp. + // TODO: Consider adding an option to include the Fluent Bit timestamp as an auto-generated + // key. For most users it would hurt compression and go unused. _, jsonRecord, err := decoder.GetRecord(dec) if err != nil { return logEvents, err @@ -106,34 +105,12 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, } event := ffi.LogEvent{ AutoKvPairs: autoKvPairs, - UserKvPairs: userKvPairs, + UserKvPairs: userKvPairs, } logEvents = append(logEvents, event) } } -// Decodes timestamp provided by Fluent Bit engine into time.Time. If timestamp cannot be -// decoded, returns system time. -// -// Parameters: -// - ts: Timestamp provided by Fluent Bit -// -// Returns: -// - timestamp: time.Time timestamp -func decodeTs(ts any) time.Time { - var timestamp time.Time - switch t := ts.(type) { - case decoder.FlbTime: - timestamp = t.Time - case uint64: - timestamp = time.Unix(int64(t), 0) - default: - log.Printf("time provided invalid, defaulting to now. Invalid type is %T", t) - timestamp = time.Now() - } - return timestamp -} - // Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always // uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater // than upload size. From a7fa236ac44c313a179801ecb2e5a834524f50b7 Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 19:25:40 +0000 Subject: [PATCH 09/31] latest --- internal/irzstd/disk.go | 32 ++++-------------- internal/irzstd/memory.go | 10 +----- internal/outctx/context.go | 33 ++++--------------- plugins/out_clp_s3/internal/flush/flush.go | 2 +- .../out_clp_s3/internal/recovery/recovery.go | 5 +-- 5 files changed, 16 insertions(+), 66 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index f42287d..73bf929 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -36,8 +36,6 @@ type diskWriter struct { irFile *os.File zstdFile *os.File irWriter *ir.Writer - size int - timezone string irTotalBytes int zstdWriter *zstd.Encoder } @@ -46,20 +44,13 @@ type diskWriter struct { // is on. // // Parameters: -// - timezone: Time zone of the log source -// - size: Byte length // - irPath: Path to IR disk buffer file // - zstdPath: Path to Zstd disk buffer file // // Returns: // - diskWriter: Disk writer for Zstd compressed IR // - err: Error creating new buffers, error opening Zstd/IR writers -func NewDiskWriter( - timezone string, - size int, - irPath string, - zstdPath string, -) (*diskWriter, error) { +func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile, zstdFile, err := newFileBuffers(irPath, zstdPath) if err != nil { return nil, err @@ -71,8 +62,6 @@ func NewDiskWriter( } diskWriter := diskWriter{ - size: size, - timezone: timezone, irPath: irPath, irFile: irFile, zstdPath: zstdPath, @@ -84,26 +73,19 @@ func NewDiskWriter( return &diskWriter, nil } -// Recovers a [diskWriter] opening buffer files from a previous execution of output plugin. -// Recovery of files necessitates that use_disk_store is on. Recovered store must be closed -// with close streams() before can be used for future writting since it does not init with -// an IR writer. Avoid use with empty disk stores. +// Recovers a [diskWriter] by opening buffer files from a previous execution of the output plugin. +// Requires use_disk_store to be enabled. The recovered writer must be closed with closeStreams() +// before it can be used for future writes, since it does not initialize with an IR writer. Avoid +// use with empty disk buffers. // // Parameters: -// - timezone: Time zone of the log source -// - size: Byte length // - irPath: Path to IR disk buffer file // - zstdPath: Path to Zstd disk buffer file // // Returns: // - diskWriter: Disk writer for Zstd compressed IR // - err: Error opening buffers, error opening Zstd/IR writers, error getting file sizes -func RecoverWriter( - timezone string, - size int, - irPath string, - zstdPath string, -) (*diskWriter, error) { +func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile, zstdFile, err := openBufferFiles(irPath, zstdPath) if err != nil { return nil, fmt.Errorf("error opening files: %w", err) @@ -115,8 +97,6 @@ func RecoverWriter( } diskWriter := diskWriter{ - size: size, - timezone: timezone, irPath: irPath, irFile: irFile, zstdPath: zstdPath, diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 3c805a7..6e86c7b 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -17,22 +17,16 @@ import ( type memoryWriter struct { zstdBuffer *bytes.Buffer irWriter *ir.Writer - size int - timezone string zstdWriter *zstd.Encoder } // Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is // off. // -// Parameters: -// - timezone: Time zone of the log source -// - size: Byte length -// // Returns: // - memoryWriter: Memory writer for Zstd compressed IR // - err: Error opening Zstd/IR writers -func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { +func NewMemoryWriter() (*memoryWriter, error) { var zstdBuffer bytes.Buffer zstdWriter, err := zstd.NewWriter(&zstdBuffer) @@ -46,8 +40,6 @@ func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { } memoryWriter := memoryWriter{ - size: size, - timezone: timezone, irWriter: irWriter, zstdWriter: zstdWriter, zstdBuffer: &zstdBuffer, diff --git a/internal/outctx/context.go b/internal/outctx/context.go index 6f3eb67..fa3aa9e 100644 --- a/internal/outctx/context.go +++ b/internal/outctx/context.go @@ -119,16 +119,15 @@ func NewS3Context(plugin unsafe.Pointer) (*S3Context, error) { // // Parameters: // - tag: Fluent Bit tag -// - size: Byte length // // Returns: // - err: Could not create buffers or tag -func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, error) { +func (ctx *S3Context) GetEventManager(tag string) (*EventManager, error) { var err error eventManager, ok := ctx.EventManagers[tag] if !ok { - eventManager, err = ctx.newEventManager(tag, size) + eventManager, err = ctx.newEventManager(tag) if err != nil { return nil, err } @@ -141,22 +140,13 @@ func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, erro // // Parameters: // - tag: Fluent Bit tag -// - size: Byte length // // Returns: // - eventManager: Manager for Fluent Bit events with the same tag // - err: Error creating new writer -func (ctx *S3Context) RecoverEventManager( - tag string, - size int, -) (*EventManager, error) { +func (ctx *S3Context) RecoverEventManager(tag string) (*EventManager, error) { irPath, zstdPath := ctx.GetBufferFilePaths(tag) - writer, err := irzstd.RecoverWriter( - ctx.Config.TimeZone, - size, - irPath, - zstdPath, - ) + writer, err := irzstd.RecoverWriter(irPath, zstdPath) if err != nil { return nil, err } @@ -177,28 +167,19 @@ func (ctx *S3Context) RecoverEventManager( // // Parameters: // - tag: Fluent Bit tag -// - size: Byte length // // Returns: // - eventManager: Manager for Fluent Bit events with the same tag // - err: Error creating new writer -func (ctx *S3Context) newEventManager( - tag string, - size int, -) (*EventManager, error) { +func (ctx *S3Context) newEventManager(tag string) (*EventManager, error) { var err error var writer irzstd.Writer if ctx.Config.UseDiskBuffer { irPath, zstdPath := ctx.GetBufferFilePaths(tag) - writer, err = irzstd.NewDiskWriter( - ctx.Config.TimeZone, - size, - irPath, - zstdPath, - ) + writer, err = irzstd.NewDiskWriter(irPath, zstdPath) } else { - writer, err = irzstd.NewMemoryWriter(ctx.Config.TimeZone, size) + writer, err = irzstd.NewMemoryWriter() } if err != nil { diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 13e92ae..ddafaa1 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -37,7 +37,7 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i return output.FLB_ERROR, err } - eventManager, err := ctx.GetEventManager(tag, size) + eventManager, err := ctx.GetEventManager(tag) if err != nil { return output.FLB_RETRY, fmt.Errorf("error getting event manager: %w", err) } diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index 2683506..e6b09dc 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -203,10 +203,7 @@ func flushExistingBuffer( return err } - eventManager, err := ctx.RecoverEventManager( - tag, - int(irFileSize), - ) + eventManager, err := ctx.RecoverEventManager(tag) if err != nil { return fmt.Errorf("error recovering event manager with tag: %w", err) } From 446ffc89df858b81c6bf78245b592b5d908fda00 Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 15:38:32 -0500 Subject: [PATCH 10/31] latest --- internal/irzstd/disk.go | 6 ++-- internal/irzstd/writer.go | 7 ++--- internal/outctx/config.go | 58 ++++++++++++++---------------------- plugins/out_clp_s3/README.md | 12 -------- 4 files changed, 29 insertions(+), 54 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 73bf929..80822f8 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -101,8 +101,8 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile: irFile, zstdPath: zstdPath, zstdFile: zstdFile, - // Recovered streams should already have IR, so irWriter is nil until flushed/reset. - irWriter: nil, + // Recovered IR must be flushed before writing new IR. irWriter is initialized in Reset(). + irWriter: nil, zstdWriter: zstdWriter, } @@ -162,7 +162,7 @@ func (w *diskWriter) CloseStreams() error { return fmt.Errorf("error flushing IR buffer: %w", err) } - // irWriter will be nil for recoveredWriter until flushed/reset at least once. + // [ir.Writer] will be nil for recovered [diskWriter] until reset. if w.irWriter == nil { err := w.irWriter.Serializer.Close() if err != nil { diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index f97255e..a38ab45 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -91,13 +91,12 @@ func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, int, error) { // Opens a new [ir.Writer] and [zstd.Encoder]. // // Parameters: -// - zstdOutput: Output location for Zstd -// - timezone: Time zone of the log source -// - size: Byte length +// - zstdOutput: Output destination for Zstd +// - irOutput: Output destination for IR // // Returns: // - irWriter: Writer for CLP IR -// - ZstdWriter: Writer for Zstd +// - zstdWriter: Writer for Zstd // - err: Error opening IR/Zstd writer func newIrZstdWriters( zstdOutput io.Writer, diff --git a/internal/outctx/config.go b/internal/outctx/config.go index d771405..67c328e 100644 --- a/internal/outctx/config.go +++ b/internal/outctx/config.go @@ -16,24 +16,20 @@ import ( // Holds settings for S3 CLP plugin from user-defined Fluent Bit configuration file. // The "conf" struct tags are the plugin options described to user in README, and allow user to see -// snake case "use_single_key" vs. camel case "SingleKey" in validation error messages. The +// snake case "use_disk_buffer" vs. camel case "UseDiskBuffer" in validation error messages. The // "validate" struct tags are rules to be consumed by [validator]. The functionality of each rule // can be found in docs for [validator]. // //nolint:revive type S3Config struct { - S3Region string `conf:"s3_region" validate:"required"` - S3Bucket string `conf:"s3_bucket" validate:"required"` - S3BucketPrefix string `conf:"s3_bucket_prefix" validate:"dirpath"` - RoleArn string `conf:"role_arn" validate:"omitempty,startswith=arn:aws:iam"` - Id string `conf:"id" validate:"required"` - UseSingleKey bool `conf:"use_single_key" validate:"-"` - AllowMissingKey bool `conf:"allow_missing_key" validate:"-"` - SingleKey string `conf:"single_key" validate:"required_if=use_single_key true"` - UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"` - DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"` - UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"` - TimeZone string `conf:"time_zone" validate:"timezone"` + S3Region string `conf:"s3_region" validate:"required"` + S3Bucket string `conf:"s3_bucket" validate:"required"` + S3BucketPrefix string `conf:"s3_bucket_prefix" validate:"dirpath"` + RoleArn string `conf:"role_arn" validate:"omitempty,startswith=arn:aws:iam"` + Id string `conf:"id" validate:"required"` + UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"` + DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"` + UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"` } // Generates configuration struct containing user-defined settings. In addition, sets default values @@ -51,33 +47,25 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { config := S3Config{ // Default Id is uuid to safeguard against s3 filename namespace collision. User may use // multiple collectors to send logs to same s3 path. Id is appended to s3 filename. - S3Region: "us-east-1", - S3BucketPrefix: "logs/", - Id: uuid.New().String(), - UseSingleKey: true, - AllowMissingKey: true, - SingleKey: "log", - UseDiskBuffer: true, - DiskBufferPath: "tmp/out_clp_s3/", - UploadSizeMb: 16, - TimeZone: "America/Toronto", + S3Region: "us-east-1", + S3BucketPrefix: "logs/", + Id: uuid.New().String(), + UseDiskBuffer: true, + DiskBufferPath: "tmp/out_clp_s3/", + UploadSizeMb: 16, } // Map used to loop over user inputs saving a [output.FLBPluginConfigKey] call for each key. // Potential to iterate over struct using reflect; however, better to avoid reflect package. pluginSettings := map[string]interface{}{ - "s3_region": &config.S3Region, - "s3_bucket": &config.S3Bucket, - "s3_bucket_prefix": &config.S3BucketPrefix, - "role_arn": &config.RoleArn, - "id": &config.Id, - "use_single_key": &config.UseSingleKey, - "allow_missing_key": &config.AllowMissingKey, - "single_key": &config.SingleKey, - "use_disk_buffer": &config.UseDiskBuffer, - "disk_buffer_path": &config.DiskBufferPath, - "upload_size_mb": &config.UploadSizeMb, - "time_zone": &config.TimeZone, + "s3_region": &config.S3Region, + "s3_bucket": &config.S3Bucket, + "s3_bucket_prefix": &config.S3BucketPrefix, + "role_arn": &config.RoleArn, + "id": &config.Id, + "use_disk_buffer": &config.UseDiskBuffer, + "disk_buffer_path": &config.DiskBufferPath, + "upload_size_mb": &config.UploadSizeMb, } for settingName, untypedField := range pluginSettings { diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index e074ed0..8b1f405 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -89,21 +89,9 @@ More detailed information for specifying credentials from AWS can be found [here | `s3_bucket_prefix` | Bucket prefix path | `logs/` | | `role_arn` | ARN of an IAM role to assume | `None` | | `id` | Name of output plugin | Random UUID | -| `use_single_key` | Output single key from Fluent Bit record. See [Use Single Key](#use-single-key) for more info. | `TRUE` | -| `allow_missing_key` | Fallback to whole record if key is missing from log. If set to false, an error will be recorded instead. | `TRUE` | -| `single_key` | Value for single key | `log` | | `use_disk_buffer` | Buffer logs on disk prior to sending to S3. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | | `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` | | `upload_size_mb` | Set upload size in MB when disk store is enabled. Size refers to the compressed size. | `16` | -| `time_zone` | Time zone of the log source, so that local times (non-unix timestamps) are handled correctly. | `America/Toronto` | - -#### Use Single Key - -Output the value corresponding to this key, instead of the whole Fluent Bit record. It is -recommended to set this to true. A Fluent Bit record is a JSON-like object, and while CLP -can parse JSON into IR it is not recommended. Key is set with `single_key` and will typically be set -to `log`, the default Fluent Bit key for unparsed logs. If this is set to false, plugin will parse -the record as JSON. #### Disk Buffering From f0c6d4dcd4cc4dcc13c721bba1525d85d218124e Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 21:27:49 +0000 Subject: [PATCH 11/31] latest --- .golangci.yml | 10 ---------- go.mod | 2 +- internal/irzstd/disk.go | 4 ++-- plugins/out_clp_s3/Dockerfile | 6 +++--- plugins/out_clp_s3/internal/flush/flush.go | 2 +- 5 files changed, 7 insertions(+), 17 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index ddbc506..055e7bd 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -42,10 +42,6 @@ linters: - linters: - revive source: '^\s*// (?:\[.+\]: )?https?://.+' - paths: - - third_party$ - - builtin$ - - examples$ issues: max-issues-per-linter: 0 max-same-issues: 0 @@ -56,9 +52,3 @@ formatters: goimports: local-prefixes: - github.com/y-scope/fluent-bit-clp - exclusions: - generated: lax - paths: - - third_party$ - - builtin$ - - examples$ diff --git a/go.mod b/go.mod index 4bbb5ab..6175554 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/y-scope/fluent-bit-clp -go 1.25.6 +go 1.24 require ( github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 80822f8..c33d6a2 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -102,7 +102,7 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { zstdPath: zstdPath, zstdFile: zstdFile, // Recovered IR must be flushed before writing new IR. irWriter is initialized in Reset(). - irWriter: nil, + irWriter: nil, zstdWriter: zstdWriter, } @@ -163,7 +163,7 @@ func (w *diskWriter) CloseStreams() error { } // [ir.Writer] will be nil for recovered [diskWriter] until reset. - if w.irWriter == nil { + if w.irWriter != nil { err := w.irWriter.Serializer.Close() if err != nil { return fmt.Errorf("error could not close irWriter: %w", err) diff --git a/plugins/out_clp_s3/Dockerfile b/plugins/out_clp_s3/Dockerfile index 914916e..55cc682 100644 --- a/plugins/out_clp_s3/Dockerfile +++ b/plugins/out_clp_s3/Dockerfile @@ -3,7 +3,7 @@ # Using bullseye tag to match debian version from Fluent Bit image [Fluent Bit Debian version]. # Matching debian versions prevents glibc compatibility issues. # [Fluent Bit Debian version]: https://github.com/fluent/fluent-bit/blob/master/dockerfiles/Dockerfile -FROM golang:1.22.3-bullseye as builder +FROM golang:1.24-bullseye AS builder # install task RUN sh -c "$(curl --location https://taskfile.dev/install.sh)" -- -d -b /bin @@ -21,9 +21,9 @@ WORKDIR /root/plugins/out_clp_s3 RUN task build -FROM fluent/fluent-bit:3.0.6 +FROM fluent/fluent-bit:4.2.2 -# Copy plugin binary to Fluent Bit image. +# Copy plugin binary to Fluent Bit image. COPY --from=builder /root/plugins/out_clp_s3/out_clp_s3.so /fluent-bit/bin/ COPY --from=builder /root/plugins/out_clp_s3/*.conf /fluent-bit/etc/ diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index ddafaa1..28c1d45 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -97,7 +97,7 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, return logEvents, err } - var autoKvPairs map[string]any + var autoKvPairs map[string]any = make(map[string]any) var userKvPairs map[string]any err = json.Unmarshal(jsonRecord, &userKvPairs) if err != nil { From 2d8a8939ce819dbcede377b967622fb85d82a910 Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 21:30:24 +0000 Subject: [PATCH 12/31] latest --- .github/workflows/lint.yml | 2 +- .golangci.yml | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index da61625..6e8642d 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -9,7 +9,7 @@ jobs: go-lint: strategy: matrix: - go: ["1.25.6"] + go: ["1.24"] os: ["ubuntu-latest", "macos-latest"] runs-on: "${{ matrix.os }}" steps: diff --git a/.golangci.yml b/.golangci.yml index ddbc506..055e7bd 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -42,10 +42,6 @@ linters: - linters: - revive source: '^\s*// (?:\[.+\]: )?https?://.+' - paths: - - third_party$ - - builtin$ - - examples$ issues: max-issues-per-linter: 0 max-same-issues: 0 @@ -56,9 +52,3 @@ formatters: goimports: local-prefixes: - github.com/y-scope/fluent-bit-clp - exclusions: - generated: lax - paths: - - third_party$ - - builtin$ - - examples$ From be48488ad3ad1de05bee06ab8da0d371aa2b5559 Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 19 Jan 2026 21:31:59 +0000 Subject: [PATCH 13/31] latest --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 0eee096..6236d60 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/y-scope/fluent-bit-clp -go 1.25.6 +go 1.24 require ( github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c From f51eeeba6f1e8e34267317c1fbdeca39d1c5410f Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 23 Jan 2026 11:54:24 -0500 Subject: [PATCH 14/31] fix reverted changes in merge --- go.mod | 4 +++- go.sum | 8 ++++++-- internal/irzstd/disk.go | 19 ++++++++----------- internal/irzstd/memory.go | 30 ++++++++++++++++-------------- internal/irzstd/writer.go | 24 ++++++++++++------------ plugins/out_clp_s3/fluent-bit.conf | 17 +++++++++-------- 6 files changed, 54 insertions(+), 48 deletions(-) diff --git a/go.mod b/go.mod index 6236d60..6175554 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24 require ( github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c - github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb + github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855 ) require ( @@ -39,6 +39,8 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/crypto v0.19.0 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.17.0 // indirect diff --git a/go.sum b/go.sum index 8624661..cccd671 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb h1:MAuKBGpfQIIl63810kEYZxUv8tfpI9y0nZlyi7tS8A8= -github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb/go.mod h1:EkeM7lP5AWNRcmBWt3MvjAkRx7RT0gzisW4sh+SJYUw= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855 h1:yrwcVsQs6qpCiRpCHgOk7g+jo1hBVwT9MhJA1hEQLso= +github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855/go.mod h1:EuJRZ9fcHuedhtPHsVCsB84isZkqVECbvfAfhj9JGFI= golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index b023065..c33d6a2 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -56,7 +56,7 @@ func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) { return nil, err } - irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size) + irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, irFile) if err != nil { return nil, err } @@ -91,9 +91,9 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { return nil, fmt.Errorf("error opening files: %w", err) } - irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size) + zstdWriter, err := zstd.NewWriter(zstdFile) if err != nil { - return nil, err + return nil, fmt.Errorf("error opening Zstd writer: %w", err) } diskWriter := diskWriter{ @@ -115,7 +115,6 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { // the non-empty disk buffers already have existing preamble so remove it. Disk buffer // must have non-zero size or else would be deleted in recover. diskWriter.irTotalBytes = irFileSize - irWriter.Reset() return &diskWriter, nil } @@ -131,12 +130,7 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd, error flushing buffers func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { - numEvents, err := writeIr(w.irWriter, logEvents) - if err != nil { - return numEvents, err - } - - numBytes, err := w.irWriter.WriteTo(w.irFile) + numBytes, numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { return numEvents, err } @@ -177,6 +171,9 @@ func (w *diskWriter) CloseStreams() error { w.irWriter = nil } + // Add IR postamble byte manually. We cannot use [ir.Writer.Close] since it add + // postable to the irFile, which was already flushed, and not the zstdFile. + w.zstdWriter.Write([]byte{0x0}) err = w.zstdWriter.Close() if err != nil { return err @@ -197,7 +194,7 @@ func (w *diskWriter) CloseStreams() error { // - err: Error opening IR writer, error IR buffer not empty func (w *diskWriter) Reset() error { var err error - w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) + w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile) if err != nil { return err } diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 24b8f13..059490e 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -28,9 +28,15 @@ type memoryWriter struct { // - err: Error opening Zstd/IR writers func NewMemoryWriter() (*memoryWriter, error) { var zstdBuffer bytes.Buffer - irWriter, zstdWriter, err := newIrZstdWriters(&zstdBuffer, timezone, size) + + zstdWriter, err := zstd.NewWriter(&zstdBuffer) + if err != nil { + return nil, fmt.Errorf("error opening Zstd writer: %w", err) + } + + irWriter, err := ir.NewWriter[ir.FourByteEncoding](zstdWriter) if err != nil { - return nil, err + return nil, fmt.Errorf("error opening IR writer: %w", err) } memoryWriter := memoryWriter{ @@ -51,12 +57,10 @@ func NewMemoryWriter() (*memoryWriter, error) { // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { - numEvents, err := writeIr(w.irWriter, logEvents) + _, numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { return numEvents, err } - - _, err = w.irWriter.WriteTo(w.zstdWriter) return numEvents, err } @@ -66,15 +70,12 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { // Returns: // - err: Error closing buffers func (w *memoryWriter) CloseStreams() error { - _, err := w.irWriter.CloseTo(w.zstdWriter) - if err != nil { + if err := w.irWriter.Close(); err != nil { return err } - w.irWriter = nil - err = w.zstdWriter.Close() - return err + return w.zstdWriter.Close() } // Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers @@ -84,13 +85,14 @@ func (w *memoryWriter) CloseStreams() error { // - err: Error opening IR writer func (w *memoryWriter) Reset() error { var err error - w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) + w.zstdBuffer.Reset() + w.zstdWriter.Reset(w.zstdBuffer) + + w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter) if err != nil { return err } - w.zstdBuffer.Reset() - w.zstdWriter.Reset(w.zstdBuffer) return nil } @@ -136,4 +138,4 @@ func (w *memoryWriter) Close() error { } } return nil -} +} \ No newline at end of file diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index f46d24a..a38ab45 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -70,19 +70,22 @@ type Writer interface { // - logEvents: A slice of log events to be encoded // // Returns: +// - numBytes: Total IR bytes written for the batch // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error if an event could not be written -func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, error) { +func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, int, error) { var numEvents int + var numBytes int for _, event := range logEvents { - _, err := irWriter.Write(event) + n, err := irWriter.WriteLogEvent(event) + numBytes += n if err != nil { err = fmt.Errorf("failed to encode event %v into ir: %w", event, err) - return numEvents, err + return numBytes, numEvents, err } numEvents += 1 } - return numEvents, nil + return numBytes, numEvents, nil } // Opens a new [ir.Writer] and [zstd.Encoder]. @@ -97,19 +100,16 @@ func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, error) { // - err: Error opening IR/Zstd writer func newIrZstdWriters( zstdOutput io.Writer, - timezone string, - size int, + irOutput io.Writer, ) (*ir.Writer, *zstd.Encoder, error) { - // IR buffer using bytes.Buffer internally, so it will dynamically grow if undersized. Using - // FourByteEncoding as default encoding. - irWriter, err := ir.NewWriterSize[ir.FourByteEncoding](size, timezone) + zstdWriter, err := zstd.NewWriter(zstdOutput) if err != nil { - return nil, nil, fmt.Errorf("error opening IR writer: %w", err) + return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) } - zstdWriter, err := zstd.NewWriter(zstdOutput) + irWriter, err := ir.NewWriter[ir.FourByteEncoding](irOutput) if err != nil { - return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) + return nil, nil, fmt.Errorf("error opening IR writer: %w", err) } return irWriter, zstdWriter, err } diff --git a/plugins/out_clp_s3/fluent-bit.conf b/plugins/out_clp_s3/fluent-bit.conf index 86b2bf0..d3fa903 100644 --- a/plugins/out_clp_s3/fluent-bit.conf +++ b/plugins/out_clp_s3/fluent-bit.conf @@ -27,8 +27,8 @@ # Plugins File # ============ # specify an optional 'Plugins' configuration file to load external plugins. - plugins_file /fluent-bit/etc/plugin-config.conf - # plugins_file plugin-config.conf + # plugins_file /fluent-bit/etc/plugin-config.conf + plugins_file plugin-config.conf # HTTP Server # =========== @@ -38,14 +38,15 @@ http_port 2020 [INPUT] - name cpu - tag cpu.local - - # Read interval (sec) Default: 1 - interval_sec 1 + name tail + tag testlogs.local + path /home/davidmawk/fluent-bits/testlogs.json + read_from_head On [OUTPUT] name out_clp_s3 - s3_bucket myBucket + s3_bucket us-west-2-yscope-testbucket + s3_region us-west-2 + s3_bucket_prefix fluent-bit/ #role_arn arn:aws:iam::000000000000:role/accessToMyBucket match * From a53e99a54bc12ff262bbe8e6bd0037f90aeba7ba Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 23 Jan 2026 13:57:53 -0500 Subject: [PATCH 15/31] latest --- internal/decoder/decoder.go | 1 + plugins/out_clp_s3/README.md | 17 +++++++++++++---- plugins/out_clp_s3/fluent-bit.conf | 25 ++++++++++++++++--------- plugins/out_clp_s3/parsers.conf | 3 +++ 4 files changed, 33 insertions(+), 13 deletions(-) create mode 100644 plugins/out_clp_s3/parsers.conf diff --git a/internal/decoder/decoder.go b/internal/decoder/decoder.go index 9441cc0..1d86ccf 100644 --- a/internal/decoder/decoder.go +++ b/internal/decoder/decoder.go @@ -58,6 +58,7 @@ func New(data unsafe.Pointer, length int) *codec.Decoder { mh.RawToString = true mh.WriteExt = true mh.ErrorIfNoArrayExpand = true + mh.MapType = reflect.TypeOf(map[string]interface{}{}) // Set up custom extension for Fluent Bit timestamp format. mh.SetBytesExt(reflect.TypeOf(FlbTime{}), 0, &FlbTime{}) diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index 8b1f405..ecd6bcb 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -8,16 +8,25 @@ First, confirm your AWS credentials are properly setup, see [AWS credentials](#A information. Next, change the output section [fluent-bit.conf](fluent-bit.conf) to suit your needs. -See [Plugin configuration](#plugin-configuration) for description of fields. +If your logs are JSON, you should use a JSON parser on your input. +See [Plugin configuration](#plugin-configuration) for description of output options. -See below for an example: +See below for input and output examples: - ``` +``` +[INPUT] + name tail + path /var/log/app.json + tag app.json + parser basic_json +``` + +``` [OUTPUT] name out_clp_s3 s3_bucket myBucket match * - ``` +``` Lastly start the plugin: diff --git a/plugins/out_clp_s3/fluent-bit.conf b/plugins/out_clp_s3/fluent-bit.conf index d3fa903..340b35a 100644 --- a/plugins/out_clp_s3/fluent-bit.conf +++ b/plugins/out_clp_s3/fluent-bit.conf @@ -27,8 +27,9 @@ # Plugins File # ============ # specify an optional 'Plugins' configuration file to load external plugins. - # plugins_file /fluent-bit/etc/plugin-config.conf - plugins_file plugin-config.conf + plugins_file /fluent-bit/etc/plugin-config.conf + # plugins_file plugin-config.conf + parsers_file /fluent-bit/etc/parsers.conf # HTTP Server # =========== @@ -38,15 +39,21 @@ http_port 2020 [INPUT] - name tail - tag testlogs.local - path /home/davidmawk/fluent-bits/testlogs.json - read_from_head On + name cpu + tag cpu.local + + # Read interval (sec) Default: 1 + interval_sec 1 + +# Example tail input with JSON parser +# [INPUT] +# name tail +# path /var/log/app.json +# tag app.json +# parser json_basic [OUTPUT] name out_clp_s3 - s3_bucket us-west-2-yscope-testbucket - s3_region us-west-2 - s3_bucket_prefix fluent-bit/ + s3_bucket myBucket #role_arn arn:aws:iam::000000000000:role/accessToMyBucket match * diff --git a/plugins/out_clp_s3/parsers.conf b/plugins/out_clp_s3/parsers.conf new file mode 100644 index 0000000..da5bc7e --- /dev/null +++ b/plugins/out_clp_s3/parsers.conf @@ -0,0 +1,3 @@ +[PARSER] + Name basic_json + Format json From 8b1e4c0c5d013787b16361f25d04ceb6c99cda2b Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 23 Jan 2026 14:12:32 -0500 Subject: [PATCH 16/31] latest --- internal/irzstd/disk.go | 2 +- internal/irzstd/memory.go | 2 +- plugins/out_clp_s3/Dockerfile | 2 +- plugins/out_clp_s3/README.md | 3 +-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index c33d6a2..bf0ab38 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -101,7 +101,7 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile: irFile, zstdPath: zstdPath, zstdFile: zstdFile, - // Recovered IR must be flushed before writing new IR. irWriter is initialized in Reset(). + // Recovered IR must be flushed before writing new IR. irWriter: nil, zstdWriter: zstdWriter, } diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 059490e..6e86c7b 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -138,4 +138,4 @@ func (w *memoryWriter) Close() error { } } return nil -} \ No newline at end of file +} diff --git a/plugins/out_clp_s3/Dockerfile b/plugins/out_clp_s3/Dockerfile index 55cc682..692392f 100644 --- a/plugins/out_clp_s3/Dockerfile +++ b/plugins/out_clp_s3/Dockerfile @@ -23,7 +23,7 @@ RUN task build FROM fluent/fluent-bit:4.2.2 -# Copy plugin binary to Fluent Bit image. +# Copy plugin binary to Fluent Bit image. COPY --from=builder /root/plugins/out_clp_s3/out_clp_s3.so /fluent-bit/bin/ COPY --from=builder /root/plugins/out_clp_s3/*.conf /fluent-bit/etc/ diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index ecd6bcb..d58b0ea 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -7,8 +7,7 @@ Fluent Bit output plugin that sends records in CLP's compressed IR format to AWS First, confirm your AWS credentials are properly setup, see [AWS credentials](#AWS-credentials) for information. -Next, change the output section [fluent-bit.conf](fluent-bit.conf) to suit your needs. -If your logs are JSON, you should use a JSON parser on your input. +Next, change [fluent-bit.conf](fluent-bit.conf) to suit your needs. Note, if your logs are JSON, you should use a JSON parser on your input. See [Plugin configuration](#plugin-configuration) for description of output options. See below for input and output examples: From 7ed89e89c0d4aeecb8972f431f8b09b3661581d9 Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 23 Jan 2026 14:18:53 -0500 Subject: [PATCH 17/31] latest --- plugins/out_clp_s3/README.md | 13 +++++++------ plugins/out_clp_s3/fluent-bit.conf | 3 ++- plugins/out_clp_s3/internal/flush/flush.go | 4 +--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index d58b0ea..f3dbf59 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -7,7 +7,7 @@ Fluent Bit output plugin that sends records in CLP's compressed IR format to AWS First, confirm your AWS credentials are properly setup, see [AWS credentials](#AWS-credentials) for information. -Next, change [fluent-bit.conf](fluent-bit.conf) to suit your needs. Note, if your logs are JSON, you should use a JSON parser on your input. +Next, change [fluent-bit.conf](fluent-bit.conf) to suit your needs. Note, if your logs are JSON, you should use the [Fluent Bit JSON parser][1] on your input. See [Plugin configuration](#plugin-configuration) for description of output options. See below for input and output examples: @@ -48,7 +48,7 @@ Dummy logs will be written to your s3 bucket. #### Using local setup -Install [go][1] and [fluent-bit][2] +Install [go][2] and [fluent-bit][3] Download go dependencies ```shell @@ -86,7 +86,7 @@ Moreover, the plugin can assume a role by adding optional `role_arn` to role_arn arn:aws:iam::000000000000:role/accessToMyBucket ``` -More detailed information for specifying credentials from AWS can be found [here][3]. +More detailed information for specifying credentials from AWS can be found [here][4]. ### Plugin configuration @@ -122,6 +122,7 @@ Each upload will have a unique key in the following format: The index starts at 0 is incremented after each upload. The Fluent Bit tag is also attached to the object using the tag key `fluentBitTag`. -[1]: https://go.dev/doc/install -[2]: https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit -[3]: https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials +[1]: https://docs.fluentbit.io/manual/data-pipeline/parsers/json +[2]: https://go.dev/doc/install +[3]: https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit +[4]: https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials diff --git a/plugins/out_clp_s3/fluent-bit.conf b/plugins/out_clp_s3/fluent-bit.conf index 340b35a..2555b48 100644 --- a/plugins/out_clp_s3/fluent-bit.conf +++ b/plugins/out_clp_s3/fluent-bit.conf @@ -30,6 +30,7 @@ plugins_file /fluent-bit/etc/plugin-config.conf # plugins_file plugin-config.conf parsers_file /fluent-bit/etc/parsers.conf + # parsers_file parsers.conf # HTTP Server # =========== @@ -50,7 +51,7 @@ # name tail # path /var/log/app.json # tag app.json -# parser json_basic +# parser basic_json [OUTPUT] name out_clp_s3 diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 3b04136..9fc8230 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -90,9 +90,7 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, error) { var logEvents []ffi.LogEvent for { - // Discarding Fluent Bit timestamp since log records typically contain their own timestamp. - // TODO: Consider adding an option to include the Fluent Bit timestamp as an auto-generated - // key. For most users it would hurt compression and go unused. + // TODO: Add an option to include the Fluent Bit timestamp as an auto-generated key. _, jsonRecord, err := decoder.GetRecord(dec) if err != nil { return logEvents, err From 23b12be05fd689d1c2b36974035ca22d02ffa00f Mon Sep 17 00:00:00 2001 From: marco Date: Wed, 28 Jan 2026 11:31:51 -0500 Subject: [PATCH 18/31] david review --- internal/irzstd/disk.go | 22 +++++++++++----------- plugins/out_clp_s3/internal/flush/flush.go | 3 +-- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index bf0ab38..b2e0899 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -16,6 +16,9 @@ import ( // 2 MB threshold to buffer IR before compressing to Zstd. const irSizeThreshold = 2 << 20 +// IR end of stream byte used to terminate IR stream. +const irEndOfStreamByte = 0x0 + // Converts log events into Zstd compressed IR using "trash compactor" design. Log events are // converted to uncompressed IR and buffered into "bins". Uncompressed IR represents uncompressed // trash in "trash compactor". Once the bin is full, the bin is "compacted" into its own separate @@ -74,9 +77,9 @@ func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) { } // Recovers a [diskWriter] by opening buffer files from a previous execution of the output plugin. -// Requires use_disk_store to be enabled. The recovered writer must be closed with closeStreams() -// before it can be used for future writes, since it does not initialize with an IR writer. Avoid -// use with empty disk buffers. +// Requires use_disk_store to be enabled. The recovered writer must be closed with [CloseStreams] +// before it can be used for future writes, since it does not initialize with an IR writer. If both +// disk buffers are empty, the writer will not have any preamble and the IR will be invalid. // // Parameters: // - irPath: Path to IR disk buffer file @@ -101,7 +104,6 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile: irFile, zstdPath: zstdPath, zstdFile: zstdFile, - // Recovered IR must be flushed before writing new IR. irWriter: nil, zstdWriter: zstdWriter, } @@ -149,9 +151,10 @@ func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { return numEvents, nil } -// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. -// The IR buffer is also flushed before ending stream. After calling close, -// [diskWriter] must be reset prior to calling write. +// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. The IR +// buffer is also flushed before ending stream. After calling close, [diskWriter] must be reset +// prior to calling write. For recovered [diskWriter], [ir.Writer] will be nil, so the IR postamble +// byte is written manually to the Zstd buffer. // // Returns: // - err: Error flushing/closing buffers @@ -162,7 +165,6 @@ func (w *diskWriter) CloseStreams() error { return fmt.Errorf("error flushing IR buffer: %w", err) } - // [ir.Writer] will be nil for recovered [diskWriter] until reset. if w.irWriter != nil { err := w.irWriter.Serializer.Close() if err != nil { @@ -171,9 +173,7 @@ func (w *diskWriter) CloseStreams() error { w.irWriter = nil } - // Add IR postamble byte manually. We cannot use [ir.Writer.Close] since it add - // postable to the irFile, which was already flushed, and not the zstdFile. - w.zstdWriter.Write([]byte{0x0}) + w.zstdWriter.Write([]byte{irEndOfStreamByte}) err = w.zstdWriter.Close() if err != nil { return err diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 9fc8230..72fc45f 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -90,7 +90,6 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, error) { var logEvents []ffi.LogEvent for { - // TODO: Add an option to include the Fluent Bit timestamp as an auto-generated key. _, jsonRecord, err := decoder.GetRecord(dec) if err != nil { return logEvents, err @@ -100,7 +99,7 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, var userKvPairs map[string]any err = json.Unmarshal(jsonRecord, &userKvPairs) if err != nil { - err = fmt.Errorf("failed to get message from record: %w", err) + err = fmt.Errorf("failed to unmarshal record: %w", err) return nil, err } From 4d85d1cd1c5c78247c82e920d01a62b195e7e45b Mon Sep 17 00:00:00 2001 From: marco Date: Wed, 28 Jan 2026 11:37:32 -0500 Subject: [PATCH 19/31] fix lint --- internal/irzstd/disk.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index b2e0899..fa9bd45 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -100,10 +100,10 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { } diskWriter := diskWriter{ - irPath: irPath, - irFile: irFile, - zstdPath: zstdPath, - zstdFile: zstdFile, + irPath: irPath, + irFile: irFile, + zstdPath: zstdPath, + zstdFile: zstdFile, irWriter: nil, zstdWriter: zstdWriter, } From 62c96d7c0499ec35dec7d471547dc5f01ef8b08e Mon Sep 17 00:00:00 2001 From: marco Date: Wed, 28 Jan 2026 22:28:23 -0500 Subject: [PATCH 20/31] david review --- internal/irzstd/disk.go | 52 +++++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index fa9bd45..73498d7 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -78,8 +78,9 @@ func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) { // Recovers a [diskWriter] by opening buffer files from a previous execution of the output plugin. // Requires use_disk_store to be enabled. The recovered writer must be closed with [CloseStreams] -// before it can be used for future writes, since it does not initialize with an IR writer. If both -// disk buffers are empty, the writer will not have any preamble and the IR will be invalid. +// before it can be used for future writes, since it does not initialize with an IR writer. Returns +// an error if both disk buffers are empty, since the IR would not have a preamble and would be +// invalid. // // Parameters: // - irPath: Path to IR disk buffer file @@ -87,7 +88,8 @@ func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) { // // Returns: // - diskWriter: Disk writer for Zstd compressed IR -// - err: Error opening buffers, error opening Zstd/IR writers, error getting file sizes +// - err: Error opening buffers, error opening Zstd/IR writers, error getting file sizes, +// error empty buffers func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile, zstdFile, err := openBufferFiles(irPath, zstdPath) if err != nil { @@ -113,9 +115,16 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { return nil, fmt.Errorf("error getting size of IR file: %w", err) } - // During recovery, IR buffer may not be empty, so the size must be set. In addition, - // the non-empty disk buffers already have existing preamble so remove it. Disk buffer - // must have non-zero size or else would be deleted in recover. + zstdFileSize, err := diskWriter.getZstdFileSize() + if err != nil { + return nil, fmt.Errorf("error getting size of Zstd file: %w", err) + } + + if (irFileSize == 0) && (zstdFileSize == 0) { + return nil, fmt.Errorf("error both IR and Zstd buffers are empty") + } + + // During recovery, IR buffer may not be empty, so the size must be set. diskWriter.irTotalBytes = irFileSize return &diskWriter, nil @@ -266,21 +275,12 @@ func (w *diskWriter) GetZstdOutput() io.Reader { return w.zstdFile } -// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. -// Therefore, cannot keep track of size with variable as implemented for IR with [IrTotalBytes]. -// Instead, must always use stat. +// Get size of Zstd output. // // Returns: -// - err: Error calling stat +// - err: Error getting size func (w *diskWriter) GetZstdOutputSize() (int, error) { - zstdFileInfo, err := w.zstdFile.Stat() - if err != nil { - return 0, err - } - - zstdFileSize := int(zstdFileInfo.Size()) - - return zstdFileSize, err + return w.getZstdFileSize() } // Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then @@ -432,3 +432,19 @@ func (w *diskWriter) getIrFileSize() (int, error) { irFileSize := int(irFileInfo.Size()) return irFileSize, err } + +// Get size of Zstd file. [zstd] does not provide the amount of bytes written with each write. +// Therefore, cannot keep track of size with variable as implemented for IR with [irTotalBytes]. +// Instead, must always use stat. +// +// Returns: +// - err: Error calling stat +func (w *diskWriter) getZstdFileSize() (int, error) { + zstdFileInfo, err := w.zstdFile.Stat() + if err != nil { + return 0, err + } + + zstdFileSize := int(zstdFileInfo.Size()) + return zstdFileSize, err +} From 93411c27fa6b67a5a8be4ebbba90e3634df06090 Mon Sep 17 00:00:00 2001 From: davemarco <83603688+davemarco@users.noreply.github.com> Date: Thu, 29 Jan 2026 11:46:46 -0500 Subject: [PATCH 21/31] Update internal/irzstd/disk.go Co-authored-by: davidlion --- internal/irzstd/disk.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 73498d7..c0cc35a 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -78,8 +78,8 @@ func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) { // Recovers a [diskWriter] by opening buffer files from a previous execution of the output plugin. // Requires use_disk_store to be enabled. The recovered writer must be closed with [CloseStreams] -// before it can be used for future writes, since it does not initialize with an IR writer. Returns -// an error if both disk buffers are empty, since the IR would not have a preamble and would be +// before it can be used for future writes, since it does not initialize an IR writer. Returns an +// error if both disk buffers are empty, since the IR would not have a preamble and would be // invalid. // // Parameters: From fc1ec4cea85c65d154a04fb0f3919e6548c4cb56 Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 29 Jan 2026 16:49:40 +0000 Subject: [PATCH 22/31] david review --- internal/irzstd/disk.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index c0cc35a..c244788 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -124,7 +124,6 @@ func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { return nil, fmt.Errorf("error both IR and Zstd buffers are empty") } - // During recovery, IR buffer may not be empty, so the size must be set. diskWriter.irTotalBytes = irFileSize return &diskWriter, nil From dbd9d64dbb56105e0f55540db46f141e5c2883b0 Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 29 Jan 2026 13:20:38 -0500 Subject: [PATCH 23/31] david review --- internal/irzstd/disk.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index c244788..43e2ecc 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -161,8 +161,10 @@ func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { // Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. The IR // buffer is also flushed before ending stream. After calling close, [diskWriter] must be reset -// prior to calling write. For recovered [diskWriter], [ir.Writer] will be nil, so the IR postamble -// byte is written manually to the Zstd buffer. +// prior to calling write. For recovered [diskWriter], [ir.Writer] will be nil so closing the +// IR writer is skipped. The IR trailing byte is written directly to [zstdWriter] as an +// optimization to avoid an extra flush when the IR buffer is empty. [flushIrBuffer] exits early +// if the IR buffer is empty. // // Returns: // - err: Error flushing/closing buffers From f818e2872bb2901cc52934247eeb7369fc198f1b Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 29 Jan 2026 15:23:19 -0500 Subject: [PATCH 24/31] latest --- internal/irzstd/disk.go | 8 -------- internal/irzstd/memory.go | 15 ++++----------- internal/irzstd/writer.go | 6 ------ plugins/out_clp_s3/internal/flush/flush.go | 9 ++------- plugins/out_clp_s3/internal/recovery/recovery.go | 6 ++---- 5 files changed, 8 insertions(+), 36 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 43e2ecc..326441b 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -260,14 +260,6 @@ func (w *diskWriter) Close() error { return nil } -// Getter for useDiskBuffer. -// -// Returns: -// - useDiskBuffer: On/off for disk buffering -func (w *diskWriter) GetUseDiskBuffer() bool { - return true -} - // Getter for Zstd Output. // // Returns: diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 6e86c7b..a61c9dd 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -11,9 +11,10 @@ import ( "github.com/y-scope/clp-ffi-go/ir" ) -// Converts log events into Zstd compressed IR. Log events provided to writer are immediately -// converted to Zstd compressed IR and stored in [memoryWriter.ZstdBuffer]. After the Zstd buffer -// receives logs, they are immediately sent to s3. +// Converts log events into Zstd compressed IR. Log events are immediately converted to Zstd +// compressed IR and stored in [memoryWriter.zstdBuffer]. Data is buffered in memory until the +// upload size threshold is reached, then sent to S3. Unlike [diskWriter], there is no crash +// recovery since buffers are in memory. type memoryWriter struct { zstdBuffer *bytes.Buffer irWriter *ir.Writer @@ -96,14 +97,6 @@ func (w *memoryWriter) Reset() error { return nil } -// Getter for useDiskBuffer. -// -// Returns: -// - useDiskBuffer: On/off for disk buffering -func (w *memoryWriter) GetUseDiskBuffer() bool { - return false -} - // Getter for Zstd Output. // // Returns: diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index a38ab45..8f9fd3f 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -43,12 +43,6 @@ type Writer interface { // - err Reset() error - // Getter for useDiskBuffer. - // - // Returns: - // - useDiskBuffer: On/off for disk buffering - GetUseDiskBuffer() bool - // Getter for Zstd Output. // // Returns: diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 72fc45f..2d7628f 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -111,9 +111,8 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, } } -// Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always -// uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater -// than upload size. +// Checks if criteria are met to upload to s3. Checks if Zstd buffer size is greater than or equal +// to upload size. // // Parameters: // - eventManager: Manager for Fluent Bit events with the same tag @@ -123,10 +122,6 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, // - readyToUpload: Boolean if upload criteria met or not // - err: Error getting Zstd buffer size func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) { - if !eventManager.Writer.GetUseDiskBuffer() { - return true, nil - } - bufferSize, err := eventManager.Writer.GetZstdOutputSize() if err != nil { return false, fmt.Errorf("error could not get size of buffer: %w", err) diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index e6b09dc..74794ef 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -13,10 +13,8 @@ import ( "github.com/y-scope/fluent-bit-clp/internal/outctx" ) -// If useDiskBuffer is set, close all files prior to exit. Graceful exit will only be called -// if Fluent Bit receives a kill signal and not during an abrupt crash. Plugin is only -// given a limited time to clean up resources, so output is not sent to s3. Instead -// they are sent during startup. +// Gracefully exits the plugin. For disk buffering, files are closed and data is recovered on +// next startup. For memory buffering, any buffered data is lost. // // Parameters: // - ctx: Plugin context From 6ec3667ca3d71f5919a2d7638eb729dc03c9106d Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 30 Jan 2026 18:38:51 +0000 Subject: [PATCH 25/31] latest --- internal/irzstd/disk.go | 17 +++++++++ internal/irzstd/memory.go | 13 +++++++ internal/irzstd/writer.go | 7 ++++ .../out_clp_s3/internal/recovery/recovery.go | 36 +++++++++++++++++-- plugins/out_clp_s3/out_clp_s3.go | 7 +++- 5 files changed, 76 insertions(+), 4 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 326441b..7a10d46 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -441,3 +441,20 @@ func (w *diskWriter) getZstdFileSize() (int, error) { zstdFileSize := int(zstdFileInfo.Size()) return zstdFileSize, err } + +// Checks if writer is empty. True if no events are buffered. +// +// Returns: +// - empty: Boolean value that is true if buffer is empty +// - err: Error calling stat +func (w *diskWriter) CheckEmpty() (bool, error) { + zstdFileInfo, err := w.zstdFile.Stat() + if err != nil { + return false, err + } + // Not checking internal IR buffer since should it since should always be empty from + // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will + // be empty again when function terminates. + empty := (zstdFileInfo.Size() == 0) && (w.irTotalBytes == 0) + return empty, nil +} diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index a61c9dd..9684258 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -132,3 +132,16 @@ func (w *memoryWriter) Close() error { } return nil } + +// Checks if writer is empty. True if no events are buffered. Try to avoid calling this as will +// flush Zstd Writer potentially creating unnecessary frames. +// +// Returns: +// - empty: Boolean value that is true if buffer is empty +// - err: nil error to comply with interface +func (w *memoryWriter) CheckEmpty() (bool, error) { + w.zstdWriter.Flush() + + empty := w.zstdBuffer.Len() == 0 + return empty, nil +} diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index 8f9fd3f..c5ed220 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -55,6 +55,13 @@ type Writer interface { // - size: Bytes written // - err GetZstdOutputSize() (int, error) + + // Checks if writer is empty. True if no events are buffered. + // + // Returns: + // - empty: Boolean value that is true if buffer is empty + // - err + CheckEmpty() (bool, error) } // Writes log events to a IR Writer. diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index 74794ef..dcf3dd5 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -13,15 +13,14 @@ import ( "github.com/y-scope/fluent-bit-clp/internal/outctx" ) -// Gracefully exits the plugin. For disk buffering, files are closed and data is recovered on -// next startup. For memory buffering, any buffered data is lost. +// Gracefully exits the plugin by closing files. Data is recovered on next startup. // // Parameters: // - ctx: Plugin context // // Returns: // - err: Error closing file -func GracefulExit(ctx *outctx.S3Context) error { +func GracefulExitFs(ctx *outctx.S3Context) error { for _, eventManager := range ctx.EventManagers { err := eventManager.Writer.Close() if err != nil { @@ -33,6 +32,37 @@ func GracefulExit(ctx *outctx.S3Context) error { return nil } +// Gracefully exits the plugin by flushing buffered data to S3. Makes a best-effort attempt, +// however Fluent Bit may kill the plugin before the upload completes, resulting in data loss. +// +// Parameters: +// - ctx: Plugin context +// +// Returns: +// - err: Error closing file +func GracefulExitS3(ctx *outctx.S3Context) error { + for _, eventManager := range ctx.EventManagers { + empty, err := eventManager.Writer.CheckEmpty() + if err != nil { + return err + } + if empty { + continue + } + err = eventManager.ToS3(ctx.Config, ctx.Uploader) + if err != nil { + return err + } + err = eventManager.Writer.Close() + if err != nil { + return err + } + eventManager.Writer = nil + } + + return nil +} + // Sends existing disk buffers to S3. // // Parameters: diff --git a/plugins/out_clp_s3/out_clp_s3.go b/plugins/out_clp_s3/out_clp_s3.go index c0591df..e1f5774 100644 --- a/plugins/out_clp_s3/out_clp_s3.go +++ b/plugins/out_clp_s3/out_clp_s3.go @@ -132,7 +132,12 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int { log.Printf("[%s] Exit called for id: %s", s3PluginName, outCtx.Config.Id) - err := recovery.GracefulExit(outCtx) + var err error + if outCtx.Config.UseDiskBuffer { + err = recovery.GracefulExitFs(outCtx) + } else { + err = recovery.GracefulExitS3(outCtx) + } if err != nil { log.Printf("Failed to exit gracefully") } From a42dfcae59fbac7d347c39f49a63a3a5a88a50a3 Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 30 Jan 2026 19:25:32 +0000 Subject: [PATCH 26/31] latest --- internal/irzstd/disk.go | 32 +++++----- internal/irzstd/memory.go | 7 +-- plugins/out_clp_s3/internal/exit/exit.go | 59 +++++++++++++++++++ .../out_clp_s3/internal/recovery/recovery.go | 54 +---------------- plugins/out_clp_s3/out_clp_s3.go | 5 +- 5 files changed, 82 insertions(+), 75 deletions(-) create mode 100644 plugins/out_clp_s3/internal/exit/exit.go diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 7a10d46..8faf8c4 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -276,6 +276,21 @@ func (w *diskWriter) GetZstdOutputSize() (int, error) { return w.getZstdFileSize() } +// Checks if writer is empty. True if no events are buffered. +// +// Returns: +// - empty: Boolean value that is true if buffer is empty +// - err: Error calling stat +func (w *diskWriter) CheckEmpty() (bool, error) { + zstdFileInfo, err := w.zstdFile.Stat() + if err != nil { + return false, err + } + + empty := (zstdFileInfo.Size() == 0) && (w.irTotalBytes == 0) + return empty, nil +} + // Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then // truncated. // @@ -441,20 +456,3 @@ func (w *diskWriter) getZstdFileSize() (int, error) { zstdFileSize := int(zstdFileInfo.Size()) return zstdFileSize, err } - -// Checks if writer is empty. True if no events are buffered. -// -// Returns: -// - empty: Boolean value that is true if buffer is empty -// - err: Error calling stat -func (w *diskWriter) CheckEmpty() (bool, error) { - zstdFileInfo, err := w.zstdFile.Stat() - if err != nil { - return false, err - } - // Not checking internal IR buffer since should it since should always be empty from - // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will - // be empty again when function terminates. - empty := (zstdFileInfo.Size() == 0) && (w.irTotalBytes == 0) - return empty, nil -} diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 9684258..c4c2ddd 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -12,9 +12,7 @@ import ( ) // Converts log events into Zstd compressed IR. Log events are immediately converted to Zstd -// compressed IR and stored in [memoryWriter.zstdBuffer]. Data is buffered in memory until the -// upload size threshold is reached, then sent to S3. Unlike [diskWriter], there is no crash -// recovery since buffers are in memory. +// compressed IR and stored in [memoryWriter.zstdBuffer]. type memoryWriter struct { zstdBuffer *bytes.Buffer irWriter *ir.Writer @@ -106,7 +104,8 @@ func (w *memoryWriter) GetZstdOutput() io.Reader { } // Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. -// Instead, calling Len() on buffer. +// Instead, calling Len() on buffer. Size may slightly lag the real size since some data in the +// current block will be in the [zstd] encoder's internal buffer. // // Returns: // - size: Bytes written diff --git a/plugins/out_clp_s3/internal/exit/exit.go b/plugins/out_clp_s3/internal/exit/exit.go new file mode 100644 index 0000000..43f89e2 --- /dev/null +++ b/plugins/out_clp_s3/internal/exit/exit.go @@ -0,0 +1,59 @@ +// Package exit provides functions for gracefully shutting down the plugin. Exit functions are only +// called when Fluent Bit receives a kill signal, not during an abrupt crash. The plugin is given +// limited time to clean up resources before Fluent Bit terminates it. + +package exit + +import ( + "github.com/y-scope/fluent-bit-clp/internal/outctx" +) + +// Fs gracefully exits the plugin by closing files. +// +// Parameters: +// - ctx: Plugin context +// +// Returns: +// - err: Error closing file +func Fs(ctx *outctx.S3Context) error { + for _, eventManager := range ctx.EventManagers { + err := eventManager.Writer.Close() + if err != nil { + return err + } + eventManager.Writer = nil + } + + return nil +} + +// S3 gracefully exits the plugin by flushing buffered data to S3. Makes a best-effort attempt, +// however Fluent Bit may kill the plugin before the upload completes, resulting in data loss. +// +// Parameters: +// - ctx: Plugin context +// +// Returns: +// - err: Error closing file +func S3(ctx *outctx.S3Context) error { + for _, eventManager := range ctx.EventManagers { + empty, err := eventManager.Writer.CheckEmpty() + if err != nil { + return err + } + if empty { + continue + } + err = eventManager.ToS3(ctx.Config, ctx.Uploader) + if err != nil { + return err + } + err = eventManager.Writer.Close() + if err != nil { + return err + } + eventManager.Writer = nil + } + + return nil +} \ No newline at end of file diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index dcf3dd5..e6bd8c7 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -1,4 +1,4 @@ -// Package provides ability to recover disk buffer on startup and send to s3. +// Package recovery provides ability to recover disk buffer on startup and send to S3. package recovery @@ -13,57 +13,7 @@ import ( "github.com/y-scope/fluent-bit-clp/internal/outctx" ) -// Gracefully exits the plugin by closing files. Data is recovered on next startup. -// -// Parameters: -// - ctx: Plugin context -// -// Returns: -// - err: Error closing file -func GracefulExitFs(ctx *outctx.S3Context) error { - for _, eventManager := range ctx.EventManagers { - err := eventManager.Writer.Close() - if err != nil { - return err - } - eventManager.Writer = nil - } - - return nil -} - -// Gracefully exits the plugin by flushing buffered data to S3. Makes a best-effort attempt, -// however Fluent Bit may kill the plugin before the upload completes, resulting in data loss. -// -// Parameters: -// - ctx: Plugin context -// -// Returns: -// - err: Error closing file -func GracefulExitS3(ctx *outctx.S3Context) error { - for _, eventManager := range ctx.EventManagers { - empty, err := eventManager.Writer.CheckEmpty() - if err != nil { - return err - } - if empty { - continue - } - err = eventManager.ToS3(ctx.Config, ctx.Uploader) - if err != nil { - return err - } - err = eventManager.Writer.Close() - if err != nil { - return err - } - eventManager.Writer = nil - } - - return nil -} - -// Sends existing disk buffers to S3. +// RecoverBufferFiles sends existing disk buffers to S3. // // Parameters: // - ctx: Plugin context diff --git a/plugins/out_clp_s3/out_clp_s3.go b/plugins/out_clp_s3/out_clp_s3.go index e1f5774..3e3f439 100644 --- a/plugins/out_clp_s3/out_clp_s3.go +++ b/plugins/out_clp_s3/out_clp_s3.go @@ -17,6 +17,7 @@ import ( "github.com/y-scope/fluent-bit-clp/internal/outctx" "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/flush" + "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/exit" "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/recovery" ) @@ -134,9 +135,9 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int { var err error if outCtx.Config.UseDiskBuffer { - err = recovery.GracefulExitFs(outCtx) + err = exit.Fs(outCtx) } else { - err = recovery.GracefulExitS3(outCtx) + err = exit.S3(outCtx) } if err != nil { log.Printf("Failed to exit gracefully") From 8eff7e4232cfd9bf880758acad761902b143a200 Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 30 Jan 2026 19:27:21 +0000 Subject: [PATCH 27/31] latest --- plugins/out_clp_s3/internal/exit/exit.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/out_clp_s3/internal/exit/exit.go b/plugins/out_clp_s3/internal/exit/exit.go index 43f89e2..82c0388 100644 --- a/plugins/out_clp_s3/internal/exit/exit.go +++ b/plugins/out_clp_s3/internal/exit/exit.go @@ -28,7 +28,7 @@ func Fs(ctx *outctx.S3Context) error { } // S3 gracefully exits the plugin by flushing buffered data to S3. Makes a best-effort attempt, -// however Fluent Bit may kill the plugin before the upload completes, resulting in data loss. +// however Fluent Bit may kill the plugin before the upload completes. // // Parameters: // - ctx: Plugin context @@ -56,4 +56,4 @@ func S3(ctx *outctx.S3Context) error { } return nil -} \ No newline at end of file +} From a3e31a5344a394f0c7ee20bda80097cfc191cfe2 Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 30 Jan 2026 19:30:36 +0000 Subject: [PATCH 28/31] latest --- plugins/out_clp_s3/internal/flush/flush.go | 3 +-- plugins/out_clp_s3/internal/recovery/recovery.go | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 2d7628f..df8d58c 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -111,8 +111,7 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, } } -// Checks if criteria are met to upload to s3. Checks if Zstd buffer size is greater than or equal -// to upload size. +// Checks whether Zstd buffer size is greater than or equal to upload size. // // Parameters: // - eventManager: Manager for Fluent Bit events with the same tag diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index e6bd8c7..c8a1507 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -1,4 +1,4 @@ -// Package recovery provides ability to recover disk buffer on startup and send to S3. +// Package provides ability to recover disk buffer on startup and send to s3. package recovery @@ -13,7 +13,7 @@ import ( "github.com/y-scope/fluent-bit-clp/internal/outctx" ) -// RecoverBufferFiles sends existing disk buffers to S3. +// Sends existing disk buffers to S3. // // Parameters: // - ctx: Plugin context From 7d601651ca7c408f2e03457fdd0e4638e36b4aa1 Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 30 Jan 2026 14:46:56 -0500 Subject: [PATCH 29/31] lint --- plugins/out_clp_s3/out_clp_s3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/out_clp_s3/out_clp_s3.go b/plugins/out_clp_s3/out_clp_s3.go index 3e3f439..f894ad4 100644 --- a/plugins/out_clp_s3/out_clp_s3.go +++ b/plugins/out_clp_s3/out_clp_s3.go @@ -16,8 +16,8 @@ import ( "github.com/fluent/fluent-bit-go/output" "github.com/y-scope/fluent-bit-clp/internal/outctx" - "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/flush" "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/exit" + "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/flush" "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/recovery" ) From da2a955a090b8db53b2e247fd72c5c6ee1dc1a27 Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 19 Feb 2026 12:45:37 -0500 Subject: [PATCH 30/31] latest --- internal/irzstd/disk.go | 2 +- internal/irzstd/memory.go | 21 ++++++++++----------- internal/irzstd/writer.go | 2 +- plugins/out_clp_s3/README.md | 22 ++++++++++++---------- plugins/out_clp_s3/internal/exit/exit.go | 6 +++--- plugins/out_clp_s3/out_clp_s3.go | 2 +- 6 files changed, 28 insertions(+), 27 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 8faf8c4..e4be566 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -281,7 +281,7 @@ func (w *diskWriter) GetZstdOutputSize() (int, error) { // Returns: // - empty: Boolean value that is true if buffer is empty // - err: Error calling stat -func (w *diskWriter) CheckEmpty() (bool, error) { +func (w *diskWriter) Empty() (bool, error) { zstdFileInfo, err := w.zstdFile.Stat() if err != nil { return false, err diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index c4c2ddd..1f444fd 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -14,9 +14,10 @@ import ( // Converts log events into Zstd compressed IR. Log events are immediately converted to Zstd // compressed IR and stored in [memoryWriter.zstdBuffer]. type memoryWriter struct { - zstdBuffer *bytes.Buffer - irWriter *ir.Writer - zstdWriter *zstd.Encoder + zstdBuffer *bytes.Buffer + irWriter *ir.Writer + zstdWriter *zstd.Encoder + irTotalBytes int } // Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is @@ -56,7 +57,8 @@ func NewMemoryWriter() (*memoryWriter, error) { // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { - _, numEvents, err := writeIr(w.irWriter, logEvents) + numBytes, numEvents, err := writeIr(w.irWriter, logEvents) + w.irTotalBytes += numBytes if err != nil { return numEvents, err } @@ -86,6 +88,7 @@ func (w *memoryWriter) Reset() error { var err error w.zstdBuffer.Reset() w.zstdWriter.Reset(w.zstdBuffer) + w.irTotalBytes = 0 w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter) if err != nil { @@ -132,15 +135,11 @@ func (w *memoryWriter) Close() error { return nil } -// Checks if writer is empty. True if no events are buffered. Try to avoid calling this as will -// flush Zstd Writer potentially creating unnecessary frames. +// Checks if writer is empty. True if no events are buffered. // // Returns: // - empty: Boolean value that is true if buffer is empty // - err: nil error to comply with interface -func (w *memoryWriter) CheckEmpty() (bool, error) { - w.zstdWriter.Flush() - - empty := w.zstdBuffer.Len() == 0 - return empty, nil +func (w *memoryWriter) Empty() (bool, error) { + return w.irTotalBytes == 0, nil } diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index c5ed220..7b6632e 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -61,7 +61,7 @@ type Writer interface { // Returns: // - empty: Boolean value that is true if buffer is empty // - err - CheckEmpty() (bool, error) + Empty() (bool, error) } // Writes log events to a IR Writer. diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index f3dbf59..9e2c247 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -97,21 +97,23 @@ More detailed information for specifying credentials from AWS can be found [here | `s3_bucket_prefix` | Bucket prefix path | `logs/` | | `role_arn` | ARN of an IAM role to assume | `None` | | `id` | Name of output plugin | Random UUID | -| `use_disk_buffer` | Buffer logs on disk prior to sending to S3. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | +| `use_disk_buffer` | Buffer logs on disk. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | | `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` | -| `upload_size_mb` | Set upload size in MB when disk store is enabled. Size refers to the compressed size. | `16` | +| `upload_size_mb` | Set upload size in MB. Size refers to the compressed size. | `16` | #### Disk Buffering -The output plugin recieves raw logs from Fluent Bit in small chunks. With `use_disk_buffer` set, the -output plugin will accumulate logs on disk until the upload size is reached. Buffering logs will -reduce the amount of S3 API requests and improve the compression ratio. However, the plugin will use -disk space and have higher memory requirements. The amount of system resources will be proportional -to the amount of Fluent Bit tags. With `use_disk_buffer` off, the plugin will immediately process -each chunk and send it to S3. +The output plugin recieves raw logs from Fluent Bit in small chunks and accumulates them in a compressed +buffer until the upload size is reached before sending to S3. -Logs are stored on the disk as IR and Zstd compressed IR. If the plugin were to crash, stored logs -will be sent to S3 when Fluent Bit restarts. The upload index restarts on recovery. +With `use_disk_buffer` set, logs are stored on disk as IR and Zstd compressed IR. On a graceful shutdown +or abrupt crash, stored logs will be sent to S3 when Fluent Bit restarts. For an abrupt crash, there is +a very small chance of data corruption if the plugin crashed mid write. The upload index restarts on +recovery. + +With `use_disk_buffer` off, logs are stored in memory as Zstd compressed IR. On a graceful shutdown, the +plugin will attempt to upload any buffered data to S3 before Fluent Bit terminates it. On an abrupt +crash, in-memory data is lost. ### S3 Objects diff --git a/plugins/out_clp_s3/internal/exit/exit.go b/plugins/out_clp_s3/internal/exit/exit.go index 82c0388..4779104 100644 --- a/plugins/out_clp_s3/internal/exit/exit.go +++ b/plugins/out_clp_s3/internal/exit/exit.go @@ -8,14 +8,14 @@ import ( "github.com/y-scope/fluent-bit-clp/internal/outctx" ) -// Fs gracefully exits the plugin by closing files. +// NoUpload gracefully exits the plugin by closing writers without uploading. // // Parameters: // - ctx: Plugin context // // Returns: // - err: Error closing file -func Fs(ctx *outctx.S3Context) error { +func NoUpload(ctx *outctx.S3Context) error { for _, eventManager := range ctx.EventManagers { err := eventManager.Writer.Close() if err != nil { @@ -37,7 +37,7 @@ func Fs(ctx *outctx.S3Context) error { // - err: Error closing file func S3(ctx *outctx.S3Context) error { for _, eventManager := range ctx.EventManagers { - empty, err := eventManager.Writer.CheckEmpty() + empty, err := eventManager.Writer.Empty() if err != nil { return err } diff --git a/plugins/out_clp_s3/out_clp_s3.go b/plugins/out_clp_s3/out_clp_s3.go index f894ad4..ed6f3ef 100644 --- a/plugins/out_clp_s3/out_clp_s3.go +++ b/plugins/out_clp_s3/out_clp_s3.go @@ -135,7 +135,7 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int { var err error if outCtx.Config.UseDiskBuffer { - err = exit.Fs(outCtx) + err = exit.NoUpload(outCtx) } else { err = exit.S3(outCtx) } From 9499c9c0ccfa42f3a26009384d8ef576ef8b208d Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 19 Feb 2026 12:56:36 -0500 Subject: [PATCH 31/31] latest --- plugins/out_clp_s3/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index 9e2c247..63e4a7d 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -97,9 +97,9 @@ More detailed information for specifying credentials from AWS can be found [here | `s3_bucket_prefix` | Bucket prefix path | `logs/` | | `role_arn` | ARN of an IAM role to assume | `None` | | `id` | Name of output plugin | Random UUID | -| `use_disk_buffer` | Buffer logs on disk. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | -| `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` | -| `upload_size_mb` | Set upload size in MB. Size refers to the compressed size. | `16` | +| `use_disk_buffer` | Buffer logs on disk. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | +| `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` | +| `upload_size_mb` | Set upload size in MB. Size refers to the compressed size. | `16` | #### Disk Buffering