diff --git a/go.mod b/go.mod index 6236d60..6175554 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24 require ( github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c - github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb + github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855 ) require ( @@ -39,6 +39,8 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/crypto v0.19.0 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.17.0 // indirect diff --git a/go.sum b/go.sum index 8624661..cccd671 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb h1:MAuKBGpfQIIl63810kEYZxUv8tfpI9y0nZlyi7tS8A8= -github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb/go.mod h1:EkeM7lP5AWNRcmBWt3MvjAkRx7RT0gzisW4sh+SJYUw= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855 h1:yrwcVsQs6qpCiRpCHgOk7g+jo1hBVwT9MhJA1hEQLso= +github.com/y-scope/clp-ffi-go v0.0.9-beta.0.20250629182525-0dc22d574855/go.mod h1:EuJRZ9fcHuedhtPHsVCsB84isZkqVECbvfAfhj9JGFI= golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= diff --git a/internal/decoder/decoder.go b/internal/decoder/decoder.go index 9441cc0..1d86ccf 100644 --- a/internal/decoder/decoder.go +++ b/internal/decoder/decoder.go @@ -58,6 +58,7 @@ func New(data unsafe.Pointer, length int) *codec.Decoder { mh.RawToString = true mh.WriteExt = true mh.ErrorIfNoArrayExpand = true + mh.MapType = reflect.TypeOf(map[string]interface{}{}) // Set up custom extension for Fluent Bit timestamp format. mh.SetBytesExt(reflect.TypeOf(FlbTime{}), 0, &FlbTime{}) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 0d1cf59..43e2ecc 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -16,6 +16,9 @@ import ( // 2 MB threshold to buffer IR before compressing to Zstd. const irSizeThreshold = 2 << 20 +// IR end of stream byte used to terminate IR stream. +const irEndOfStreamByte = 0x0 + // Converts log events into Zstd compressed IR using "trash compactor" design. Log events are // converted to uncompressed IR and buffered into "bins". Uncompressed IR represents uncompressed // trash in "trash compactor". Once the bin is full, the bin is "compacted" into its own separate @@ -36,8 +39,6 @@ type diskWriter struct { irFile *os.File zstdFile *os.File irWriter *ir.Writer - size int - timezone string irTotalBytes int zstdWriter *zstd.Encoder } @@ -46,33 +47,24 @@ type diskWriter struct { // is on. // // Parameters: -// - timezone: Time zone of the log source -// - size: Byte length // - irPath: Path to IR disk buffer file // - zstdPath: Path to Zstd disk buffer file // // Returns: // - diskWriter: Disk writer for Zstd compressed IR // - err: Error creating new buffers, error opening Zstd/IR writers -func NewDiskWriter( - timezone string, - size int, - irPath string, - zstdPath string, -) (*diskWriter, error) { +func NewDiskWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile, zstdFile, err := newFileBuffers(irPath, zstdPath) if err != nil { return nil, err } - irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size) + irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, irFile) if err != nil { return nil, err } diskWriter := diskWriter{ - size: size, - timezone: timezone, irPath: irPath, irFile: irFile, zstdPath: zstdPath, @@ -84,43 +76,37 @@ func NewDiskWriter( return &diskWriter, nil } -// Recovers a [diskWriter] opening buffer files from a previous execution of output plugin. -// Recovery of files necessitates that use_disk_store is on. IR preamble is removed for -// recovered store. Avoid use with empty disk stores as there will be no preamble. +// Recovers a [diskWriter] by opening buffer files from a previous execution of the output plugin. +// Requires use_disk_store to be enabled. The recovered writer must be closed with [CloseStreams] +// before it can be used for future writes, since it does not initialize an IR writer. Returns an +// error if both disk buffers are empty, since the IR would not have a preamble and would be +// invalid. // // Parameters: -// - timezone: Time zone of the log source -// - size: Byte length // - irPath: Path to IR disk buffer file // - zstdPath: Path to Zstd disk buffer file // // Returns: // - diskWriter: Disk writer for Zstd compressed IR -// - err: Error opening buffers, error opening Zstd/IR writers, error getting file sizes -func RecoverWriter( - timezone string, - size int, - irPath string, - zstdPath string, -) (*diskWriter, error) { +// - err: Error opening buffers, error opening Zstd/IR writers, error getting file sizes, +// error empty buffers +func RecoverWriter(irPath string, zstdPath string) (*diskWriter, error) { irFile, zstdFile, err := openBufferFiles(irPath, zstdPath) if err != nil { return nil, fmt.Errorf("error opening files: %w", err) } - irWriter, zstdWriter, err := newIrZstdWriters(zstdFile, timezone, size) + zstdWriter, err := zstd.NewWriter(zstdFile) if err != nil { - return nil, err + return nil, fmt.Errorf("error opening Zstd writer: %w", err) } diskWriter := diskWriter{ - size: size, - timezone: timezone, irPath: irPath, irFile: irFile, zstdPath: zstdPath, zstdFile: zstdFile, - irWriter: irWriter, + irWriter: nil, zstdWriter: zstdWriter, } @@ -129,11 +115,16 @@ func RecoverWriter( return nil, fmt.Errorf("error getting size of IR file: %w", err) } - // During recovery, IR buffer may not be empty, so the size must be set. In addition, - // the non-empty disk buffers already have existing preamble so remove it. Disk buffer - // must have non-zero size or else would be deleted in recover. + zstdFileSize, err := diskWriter.getZstdFileSize() + if err != nil { + return nil, fmt.Errorf("error getting size of Zstd file: %w", err) + } + + if (irFileSize == 0) && (zstdFileSize == 0) { + return nil, fmt.Errorf("error both IR and Zstd buffers are empty") + } + diskWriter.irTotalBytes = irFileSize - irWriter.Reset() return &diskWriter, nil } @@ -149,12 +140,7 @@ func RecoverWriter( // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd, error flushing buffers func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { - numEvents, err := writeIr(w.irWriter, logEvents) - if err != nil { - return numEvents, err - } - - numBytes, err := w.irWriter.WriteTo(w.irFile) + numBytes, numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { return numEvents, err } @@ -173,9 +159,12 @@ func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { return numEvents, nil } -// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. -// The IR buffer is also flushed before ending stream. After calling close, -// [diskWriter] must be reset prior to calling write. +// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. The IR +// buffer is also flushed before ending stream. After calling close, [diskWriter] must be reset +// prior to calling write. For recovered [diskWriter], [ir.Writer] will be nil so closing the +// IR writer is skipped. The IR trailing byte is written directly to [zstdWriter] as an +// optimization to avoid an extra flush when the IR buffer is empty. [flushIrBuffer] exits early +// if the IR buffer is empty. // // Returns: // - err: Error flushing/closing buffers @@ -186,13 +175,15 @@ func (w *diskWriter) CloseStreams() error { return fmt.Errorf("error flushing IR buffer: %w", err) } - _, err = w.irWriter.CloseTo(w.zstdWriter) - if err != nil { - return err + if w.irWriter != nil { + err := w.irWriter.Serializer.Close() + if err != nil { + return fmt.Errorf("error could not close irWriter: %w", err) + } + w.irWriter = nil } - w.irWriter = nil - + w.zstdWriter.Write([]byte{irEndOfStreamByte}) err = w.zstdWriter.Close() if err != nil { return err @@ -213,7 +204,7 @@ func (w *diskWriter) CloseStreams() error { // - err: Error opening IR writer, error IR buffer not empty func (w *diskWriter) Reset() error { var err error - w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) + w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.irFile) if err != nil { return err } @@ -285,21 +276,12 @@ func (w *diskWriter) GetZstdOutput() io.Reader { return w.zstdFile } -// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. -// Therefore, cannot keep track of size with variable as implemented for IR with [IrTotalBytes]. -// Instead, must always use stat. +// Get size of Zstd output. // // Returns: -// - err: Error calling stat +// - err: Error getting size func (w *diskWriter) GetZstdOutputSize() (int, error) { - zstdFileInfo, err := w.zstdFile.Stat() - if err != nil { - return 0, err - } - - zstdFileSize := int(zstdFileInfo.Size()) - - return zstdFileSize, err + return w.getZstdFileSize() } // Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then @@ -451,3 +433,19 @@ func (w *diskWriter) getIrFileSize() (int, error) { irFileSize := int(irFileInfo.Size()) return irFileSize, err } + +// Get size of Zstd file. [zstd] does not provide the amount of bytes written with each write. +// Therefore, cannot keep track of size with variable as implemented for IR with [irTotalBytes]. +// Instead, must always use stat. +// +// Returns: +// - err: Error calling stat +func (w *diskWriter) getZstdFileSize() (int, error) { + zstdFileInfo, err := w.zstdFile.Stat() + if err != nil { + return 0, err + } + + zstdFileSize := int(zstdFileInfo.Size()) + return zstdFileSize, err +} diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 5d8816b..6e86c7b 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -17,31 +17,29 @@ import ( type memoryWriter struct { zstdBuffer *bytes.Buffer irWriter *ir.Writer - size int - timezone string zstdWriter *zstd.Encoder } // Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is // off. // -// Parameters: -// - timezone: Time zone of the log source -// - size: Byte length -// // Returns: // - memoryWriter: Memory writer for Zstd compressed IR // - err: Error opening Zstd/IR writers -func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { +func NewMemoryWriter() (*memoryWriter, error) { var zstdBuffer bytes.Buffer - irWriter, zstdWriter, err := newIrZstdWriters(&zstdBuffer, timezone, size) + + zstdWriter, err := zstd.NewWriter(&zstdBuffer) if err != nil { - return nil, err + return nil, fmt.Errorf("error opening Zstd writer: %w", err) + } + + irWriter, err := ir.NewWriter[ir.FourByteEncoding](zstdWriter) + if err != nil { + return nil, fmt.Errorf("error opening IR writer: %w", err) } memoryWriter := memoryWriter{ - size: size, - timezone: timezone, irWriter: irWriter, zstdWriter: zstdWriter, zstdBuffer: &zstdBuffer, @@ -59,12 +57,10 @@ func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { - numEvents, err := writeIr(w.irWriter, logEvents) + _, numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { return numEvents, err } - - _, err = w.irWriter.WriteTo(w.zstdWriter) return numEvents, err } @@ -74,15 +70,12 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { // Returns: // - err: Error closing buffers func (w *memoryWriter) CloseStreams() error { - _, err := w.irWriter.CloseTo(w.zstdWriter) - if err != nil { + if err := w.irWriter.Close(); err != nil { return err } - w.irWriter = nil - err = w.zstdWriter.Close() - return err + return w.zstdWriter.Close() } // Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers @@ -92,13 +85,14 @@ func (w *memoryWriter) CloseStreams() error { // - err: Error opening IR writer func (w *memoryWriter) Reset() error { var err error - w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) + w.zstdBuffer.Reset() + w.zstdWriter.Reset(w.zstdBuffer) + + w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter) if err != nil { return err } - w.zstdBuffer.Reset() - w.zstdWriter.Reset(w.zstdBuffer) return nil } diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index eb43758..a38ab45 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -70,47 +70,46 @@ type Writer interface { // - logEvents: A slice of log events to be encoded // // Returns: +// - numBytes: Total IR bytes written for the batch // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error if an event could not be written -func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, error) { +func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, int, error) { var numEvents int + var numBytes int for _, event := range logEvents { - _, err := irWriter.Write(event) + n, err := irWriter.WriteLogEvent(event) + numBytes += n if err != nil { err = fmt.Errorf("failed to encode event %v into ir: %w", event, err) - return numEvents, err + return numBytes, numEvents, err } numEvents += 1 } - return numEvents, nil + return numBytes, numEvents, nil } // Opens a new [ir.Writer] and [zstd.Encoder]. // // Parameters: -// - zstdOutput: Output location for Zstd -// - timezone: Time zone of the log source -// - size: Byte length +// - zstdOutput: Output destination for Zstd +// - irOutput: Output destination for IR // // Returns: // - irWriter: Writer for CLP IR -// - ZstdWriter: Writer for Zstd +// - zstdWriter: Writer for Zstd // - err: Error opening IR/Zstd writer func newIrZstdWriters( zstdOutput io.Writer, - timezone string, - size int, + irOutput io.Writer, ) (*ir.Writer, *zstd.Encoder, error) { - // IR buffer using bytes.Buffer internally, so it will dynamically grow if undersized. Using - // FourByteEncoding as default encoding. - irWriter, err := ir.NewWriterSize[ir.FourByteEncoding](size, timezone) + zstdWriter, err := zstd.NewWriter(zstdOutput) if err != nil { - return nil, nil, fmt.Errorf("error opening IR writer: %w", err) + return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) } - zstdWriter, err := zstd.NewWriter(zstdOutput) + irWriter, err := ir.NewWriter[ir.FourByteEncoding](irOutput) if err != nil { - return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) + return nil, nil, fmt.Errorf("error opening IR writer: %w", err) } return irWriter, zstdWriter, err } diff --git a/internal/outctx/config.go b/internal/outctx/config.go index d771405..67c328e 100644 --- a/internal/outctx/config.go +++ b/internal/outctx/config.go @@ -16,24 +16,20 @@ import ( // Holds settings for S3 CLP plugin from user-defined Fluent Bit configuration file. // The "conf" struct tags are the plugin options described to user in README, and allow user to see -// snake case "use_single_key" vs. camel case "SingleKey" in validation error messages. The +// snake case "use_disk_buffer" vs. camel case "UseDiskBuffer" in validation error messages. The // "validate" struct tags are rules to be consumed by [validator]. The functionality of each rule // can be found in docs for [validator]. // //nolint:revive type S3Config struct { - S3Region string `conf:"s3_region" validate:"required"` - S3Bucket string `conf:"s3_bucket" validate:"required"` - S3BucketPrefix string `conf:"s3_bucket_prefix" validate:"dirpath"` - RoleArn string `conf:"role_arn" validate:"omitempty,startswith=arn:aws:iam"` - Id string `conf:"id" validate:"required"` - UseSingleKey bool `conf:"use_single_key" validate:"-"` - AllowMissingKey bool `conf:"allow_missing_key" validate:"-"` - SingleKey string `conf:"single_key" validate:"required_if=use_single_key true"` - UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"` - DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"` - UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"` - TimeZone string `conf:"time_zone" validate:"timezone"` + S3Region string `conf:"s3_region" validate:"required"` + S3Bucket string `conf:"s3_bucket" validate:"required"` + S3BucketPrefix string `conf:"s3_bucket_prefix" validate:"dirpath"` + RoleArn string `conf:"role_arn" validate:"omitempty,startswith=arn:aws:iam"` + Id string `conf:"id" validate:"required"` + UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"` + DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"` + UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"` } // Generates configuration struct containing user-defined settings. In addition, sets default values @@ -51,33 +47,25 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { config := S3Config{ // Default Id is uuid to safeguard against s3 filename namespace collision. User may use // multiple collectors to send logs to same s3 path. Id is appended to s3 filename. - S3Region: "us-east-1", - S3BucketPrefix: "logs/", - Id: uuid.New().String(), - UseSingleKey: true, - AllowMissingKey: true, - SingleKey: "log", - UseDiskBuffer: true, - DiskBufferPath: "tmp/out_clp_s3/", - UploadSizeMb: 16, - TimeZone: "America/Toronto", + S3Region: "us-east-1", + S3BucketPrefix: "logs/", + Id: uuid.New().String(), + UseDiskBuffer: true, + DiskBufferPath: "tmp/out_clp_s3/", + UploadSizeMb: 16, } // Map used to loop over user inputs saving a [output.FLBPluginConfigKey] call for each key. // Potential to iterate over struct using reflect; however, better to avoid reflect package. pluginSettings := map[string]interface{}{ - "s3_region": &config.S3Region, - "s3_bucket": &config.S3Bucket, - "s3_bucket_prefix": &config.S3BucketPrefix, - "role_arn": &config.RoleArn, - "id": &config.Id, - "use_single_key": &config.UseSingleKey, - "allow_missing_key": &config.AllowMissingKey, - "single_key": &config.SingleKey, - "use_disk_buffer": &config.UseDiskBuffer, - "disk_buffer_path": &config.DiskBufferPath, - "upload_size_mb": &config.UploadSizeMb, - "time_zone": &config.TimeZone, + "s3_region": &config.S3Region, + "s3_bucket": &config.S3Bucket, + "s3_bucket_prefix": &config.S3BucketPrefix, + "role_arn": &config.RoleArn, + "id": &config.Id, + "use_disk_buffer": &config.UseDiskBuffer, + "disk_buffer_path": &config.DiskBufferPath, + "upload_size_mb": &config.UploadSizeMb, } for settingName, untypedField := range pluginSettings { diff --git a/internal/outctx/context.go b/internal/outctx/context.go index 6f3eb67..fa3aa9e 100644 --- a/internal/outctx/context.go +++ b/internal/outctx/context.go @@ -119,16 +119,15 @@ func NewS3Context(plugin unsafe.Pointer) (*S3Context, error) { // // Parameters: // - tag: Fluent Bit tag -// - size: Byte length // // Returns: // - err: Could not create buffers or tag -func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, error) { +func (ctx *S3Context) GetEventManager(tag string) (*EventManager, error) { var err error eventManager, ok := ctx.EventManagers[tag] if !ok { - eventManager, err = ctx.newEventManager(tag, size) + eventManager, err = ctx.newEventManager(tag) if err != nil { return nil, err } @@ -141,22 +140,13 @@ func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, erro // // Parameters: // - tag: Fluent Bit tag -// - size: Byte length // // Returns: // - eventManager: Manager for Fluent Bit events with the same tag // - err: Error creating new writer -func (ctx *S3Context) RecoverEventManager( - tag string, - size int, -) (*EventManager, error) { +func (ctx *S3Context) RecoverEventManager(tag string) (*EventManager, error) { irPath, zstdPath := ctx.GetBufferFilePaths(tag) - writer, err := irzstd.RecoverWriter( - ctx.Config.TimeZone, - size, - irPath, - zstdPath, - ) + writer, err := irzstd.RecoverWriter(irPath, zstdPath) if err != nil { return nil, err } @@ -177,28 +167,19 @@ func (ctx *S3Context) RecoverEventManager( // // Parameters: // - tag: Fluent Bit tag -// - size: Byte length // // Returns: // - eventManager: Manager for Fluent Bit events with the same tag // - err: Error creating new writer -func (ctx *S3Context) newEventManager( - tag string, - size int, -) (*EventManager, error) { +func (ctx *S3Context) newEventManager(tag string) (*EventManager, error) { var err error var writer irzstd.Writer if ctx.Config.UseDiskBuffer { irPath, zstdPath := ctx.GetBufferFilePaths(tag) - writer, err = irzstd.NewDiskWriter( - ctx.Config.TimeZone, - size, - irPath, - zstdPath, - ) + writer, err = irzstd.NewDiskWriter(irPath, zstdPath) } else { - writer, err = irzstd.NewMemoryWriter(ctx.Config.TimeZone, size) + writer, err = irzstd.NewMemoryWriter() } if err != nil { diff --git a/plugins/out_clp_s3/Dockerfile b/plugins/out_clp_s3/Dockerfile index 914916e..692392f 100644 --- a/plugins/out_clp_s3/Dockerfile +++ b/plugins/out_clp_s3/Dockerfile @@ -3,7 +3,7 @@ # Using bullseye tag to match debian version from Fluent Bit image [Fluent Bit Debian version]. # Matching debian versions prevents glibc compatibility issues. # [Fluent Bit Debian version]: https://github.com/fluent/fluent-bit/blob/master/dockerfiles/Dockerfile -FROM golang:1.22.3-bullseye as builder +FROM golang:1.24-bullseye AS builder # install task RUN sh -c "$(curl --location https://taskfile.dev/install.sh)" -- -d -b /bin @@ -21,7 +21,7 @@ WORKDIR /root/plugins/out_clp_s3 RUN task build -FROM fluent/fluent-bit:3.0.6 +FROM fluent/fluent-bit:4.2.2 # Copy plugin binary to Fluent Bit image. COPY --from=builder /root/plugins/out_clp_s3/out_clp_s3.so /fluent-bit/bin/ diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index e074ed0..f3dbf59 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -7,17 +7,25 @@ Fluent Bit output plugin that sends records in CLP's compressed IR format to AWS First, confirm your AWS credentials are properly setup, see [AWS credentials](#AWS-credentials) for information. -Next, change the output section [fluent-bit.conf](fluent-bit.conf) to suit your needs. -See [Plugin configuration](#plugin-configuration) for description of fields. +Next, change [fluent-bit.conf](fluent-bit.conf) to suit your needs. Note, if your logs are JSON, you should use the [Fluent Bit JSON parser][1] on your input. +See [Plugin configuration](#plugin-configuration) for description of output options. -See below for an example: +See below for input and output examples: - ``` +``` +[INPUT] + name tail + path /var/log/app.json + tag app.json + parser basic_json +``` + +``` [OUTPUT] name out_clp_s3 s3_bucket myBucket match * - ``` +``` Lastly start the plugin: @@ -40,7 +48,7 @@ Dummy logs will be written to your s3 bucket. #### Using local setup -Install [go][1] and [fluent-bit][2] +Install [go][2] and [fluent-bit][3] Download go dependencies ```shell @@ -78,7 +86,7 @@ Moreover, the plugin can assume a role by adding optional `role_arn` to role_arn arn:aws:iam::000000000000:role/accessToMyBucket ``` -More detailed information for specifying credentials from AWS can be found [here][3]. +More detailed information for specifying credentials from AWS can be found [here][4]. ### Plugin configuration @@ -89,21 +97,9 @@ More detailed information for specifying credentials from AWS can be found [here | `s3_bucket_prefix` | Bucket prefix path | `logs/` | | `role_arn` | ARN of an IAM role to assume | `None` | | `id` | Name of output plugin | Random UUID | -| `use_single_key` | Output single key from Fluent Bit record. See [Use Single Key](#use-single-key) for more info. | `TRUE` | -| `allow_missing_key` | Fallback to whole record if key is missing from log. If set to false, an error will be recorded instead. | `TRUE` | -| `single_key` | Value for single key | `log` | | `use_disk_buffer` | Buffer logs on disk prior to sending to S3. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | | `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` | | `upload_size_mb` | Set upload size in MB when disk store is enabled. Size refers to the compressed size. | `16` | -| `time_zone` | Time zone of the log source, so that local times (non-unix timestamps) are handled correctly. | `America/Toronto` | - -#### Use Single Key - -Output the value corresponding to this key, instead of the whole Fluent Bit record. It is -recommended to set this to true. A Fluent Bit record is a JSON-like object, and while CLP -can parse JSON into IR it is not recommended. Key is set with `single_key` and will typically be set -to `log`, the default Fluent Bit key for unparsed logs. If this is set to false, plugin will parse -the record as JSON. #### Disk Buffering @@ -126,6 +122,7 @@ Each upload will have a unique key in the following format: The index starts at 0 is incremented after each upload. The Fluent Bit tag is also attached to the object using the tag key `fluentBitTag`. -[1]: https://go.dev/doc/install -[2]: https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit -[3]: https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials +[1]: https://docs.fluentbit.io/manual/data-pipeline/parsers/json +[2]: https://go.dev/doc/install +[3]: https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit +[4]: https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials diff --git a/plugins/out_clp_s3/fluent-bit.conf b/plugins/out_clp_s3/fluent-bit.conf index 86b2bf0..2555b48 100644 --- a/plugins/out_clp_s3/fluent-bit.conf +++ b/plugins/out_clp_s3/fluent-bit.conf @@ -29,6 +29,8 @@ # specify an optional 'Plugins' configuration file to load external plugins. plugins_file /fluent-bit/etc/plugin-config.conf # plugins_file plugin-config.conf + parsers_file /fluent-bit/etc/parsers.conf + # parsers_file parsers.conf # HTTP Server # =========== @@ -44,6 +46,13 @@ # Read interval (sec) Default: 1 interval_sec 1 +# Example tail input with JSON parser +# [INPUT] +# name tail +# path /var/log/app.json +# tag app.json +# parser basic_json + [OUTPUT] name out_clp_s3 s3_bucket myBucket diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 714be4f..72fc45f 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "log" - "time" "unsafe" "github.com/fluent/fluent-bit-go/output" @@ -39,7 +38,7 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i return output.FLB_ERROR, err } - eventManager, err := ctx.GetEventManager(tag, size) + eventManager, err := ctx.GetEventManager(tag) if err != nil { return output.FLB_RETRY, fmt.Errorf("error getting event manager: %w", err) } @@ -91,90 +90,27 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, error) { var logEvents []ffi.LogEvent for { - ts, record, err := decoder.GetRecord(dec) + _, jsonRecord, err := decoder.GetRecord(dec) if err != nil { return logEvents, err } - timestamp := decodeTs(ts) - msg, err := getMessage(record, config) + var autoKvPairs map[string]any = make(map[string]any) + var userKvPairs map[string]any + err = json.Unmarshal(jsonRecord, &userKvPairs) if err != nil { - err = fmt.Errorf("failed to get message from record: %w", err) + err = fmt.Errorf("failed to unmarshal record: %w", err) return nil, err } event := ffi.LogEvent{ - LogMessage: msg, - Timestamp: ffi.EpochTimeMs(timestamp.UnixMilli()), + AutoKvPairs: autoKvPairs, + UserKvPairs: userKvPairs, } logEvents = append(logEvents, event) } } -// Decodes timestamp provided by Fluent Bit engine into time.Time. If timestamp cannot be -// decoded, returns system time. -// -// Parameters: -// - ts: Timestamp provided by Fluent Bit -// -// Returns: -// - timestamp: time.Time timestamp -func decodeTs(ts any) time.Time { - var timestamp time.Time - switch t := ts.(type) { - case decoder.FlbTime: - timestamp = t.Time - case uint64: - timestamp = time.Unix(int64(t), 0) - default: - log.Printf("time provided invalid, defaulting to now. Invalid type is %T", t) - timestamp = time.Now() - } - return timestamp -} - -// Retrieves message from a record object. The message can consist of the entire object or -// just a single key. For a single key, user should set use_single_key to true in fluent-bit.conf. -// In addition user, should set single_key to "log" which is default Fluent Bit key for unparsed -// messages; however, single_key can be set to another value. To prevent failure if the key is -// missing, user can specify allow_missing_key, and behaviour will fallback to the entire object. -// -// Parameters: -// - record: JSON record from Fluent Bit with variable amount of keys -// - config: Plugin configuration -// -// Returns: -// - stringMsg: Retrieved message -// - err: Key not found, json.Unmarshal error, string type assertion error -func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { - if !config.UseSingleKey { - return string(jsonRecord), nil - } - - var record map[string]any - err := json.Unmarshal(jsonRecord, &record) - if err != nil { - return "", fmt.Errorf("failed to unmarshal json record %v: %w", jsonRecord, err) - } - - singleKeyMsg, ok := record[config.SingleKey] - if !ok { - // If key not found in record, see if allow_missing_key=true. If missing key is - // allowed, then return entire record. - if config.AllowMissingKey { - return string(jsonRecord), nil - } - return "", fmt.Errorf("key %s not found in record %v", config.SingleKey, record) - } - - stringMsg, ok := singleKeyMsg.(string) - if !ok { - return "", fmt.Errorf("string type assertion for message failed %v", singleKeyMsg) - } - - return stringMsg, nil -} - // Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always // uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater // than upload size. diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index 2683506..e6b09dc 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -203,10 +203,7 @@ func flushExistingBuffer( return err } - eventManager, err := ctx.RecoverEventManager( - tag, - int(irFileSize), - ) + eventManager, err := ctx.RecoverEventManager(tag) if err != nil { return fmt.Errorf("error recovering event manager with tag: %w", err) } diff --git a/plugins/out_clp_s3/parsers.conf b/plugins/out_clp_s3/parsers.conf new file mode 100644 index 0000000..da5bc7e --- /dev/null +++ b/plugins/out_clp_s3/parsers.conf @@ -0,0 +1,3 @@ +[PARSER] + Name basic_json + Format json