diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 0d1cf59..4a49e43 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -40,6 +40,7 @@ type diskWriter struct { timezone string irTotalBytes int zstdWriter *zstd.Encoder + closed bool } // Opens a new [diskWriter] using files for IR and Zstd buffers. For use when use_disk_store @@ -203,6 +204,8 @@ func (w *diskWriter) CloseStreams() error { return err } + w.closed = true + return nil } @@ -237,6 +240,8 @@ func (w *diskWriter) Reset() error { w.zstdWriter.Reset(w.zstdFile) + w.closed = false + return nil } @@ -277,6 +282,14 @@ func (w *diskWriter) GetUseDiskBuffer() bool { return true } +// Getter for closed. +// +// Returns: +// - closed: Boolean that is true if IR and Zstd streams are closed. +func (w *diskWriter) GetClosed() bool { + return w.closed +} + // Getter for Zstd Output. // // Returns: @@ -302,6 +315,23 @@ func (w *diskWriter) GetZstdOutputSize() (int, error) { return zstdFileSize, err } +// Checks if writer is empty. True if no events are buffered. +// +// Returns: +// - empty: Boolean value that is true if buffer is empty +// - err: Error calling stat +func (w *diskWriter) CheckEmpty() (bool, error) { + zstdFileInfo, err := w.zstdFile.Stat() + if err != nil { + return false, err + } + // Not checking internal IR buffer since should it since should always be empty from + // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will + // be empty again when function terminates. + empty := (zstdFileInfo.Size() == 0) && (w.irTotalBytes == 0) + return empty, nil +} + // Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then // truncated. // diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 5d8816b..4f007bf 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -20,6 +20,7 @@ type memoryWriter struct { size int timezone string zstdWriter *zstd.Encoder + closed bool } // Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is @@ -65,6 +66,7 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { } _, err = w.irWriter.WriteTo(w.zstdWriter) + return numEvents, err } @@ -82,6 +84,9 @@ func (w *memoryWriter) CloseStreams() error { w.irWriter = nil err = w.zstdWriter.Close() + + w.closed = true + return err } @@ -99,6 +104,8 @@ func (w *memoryWriter) Reset() error { w.zstdBuffer.Reset() w.zstdWriter.Reset(w.zstdBuffer) + + w.closed = false return nil } @@ -110,6 +117,14 @@ func (w *memoryWriter) GetUseDiskBuffer() bool { return false } +// Getter for closed. +// +// Returns: +// - closed: Boolean that is true if IR and Zstd streams are closed. +func (w *memoryWriter) GetClosed() bool { + return w.closed +} + // Getter for Zstd Output. // // Returns: @@ -119,15 +134,30 @@ func (w *memoryWriter) GetZstdOutput() io.Reader { } // Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. -// Instead, calling Len() on buffer. +// Instead, calling Len() on buffer. Try to avoid calling this as will flush Zstd Writer +// potentially creating unnecessary frames. // // Returns: // - size: Bytes written // - err: nil error to comply with interface func (w *memoryWriter) GetZstdOutputSize() (int, error) { + w.zstdWriter.Flush() return w.zstdBuffer.Len(), nil } +// Checks if writer is empty. True if no events are buffered. Try to avoid calling this as will +// flush Zstd Writer potentially creating unnecessary frames. +// +// Returns: +// - empty: Boolean value that is true if buffer is empty +// - err: nil error to comply with interface +func (w *memoryWriter) CheckEmpty() (bool, error) { + w.zstdWriter.Flush() + + empty := w.zstdBuffer.Len() == 0 + return empty, nil +} + // Closes [memoryWriter]. Currently used during recovery only, and advise caution using elsewhere. // Using [ir.Writer.Serializer.Close] instead of [ir.Writer.Close] so EndofStream byte is not // added. It is preferable to add postamble on recovery so that IR is in the same state diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index eb43758..820c13a 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -37,6 +37,12 @@ type Writer interface { // - err Close() error + // Getter for closed. + // + // Returns: + // - closed: Boolean that is true if IR and Zstd streams are closed. + GetClosed() bool + // Reinitialize Writer after calling CloseStreams(). // // Returns: @@ -61,6 +67,13 @@ type Writer interface { // - size: Bytes written // - err GetZstdOutputSize() (int, error) + + // Checks if writer is empty. True if no events are buffered. + // + // Returns: + // - empty: Boolean value that is true if buffer is empty + // - err + CheckEmpty() (bool, error) } // Writes log events to a IR Writer. diff --git a/internal/outctx/config.go b/internal/outctx/config.go index d771405..aa188ee 100644 --- a/internal/outctx/config.go +++ b/internal/outctx/config.go @@ -6,6 +6,7 @@ import ( "reflect" "strconv" "strings" + "time" "unsafe" "github.com/go-playground/validator/v10" @@ -22,18 +23,19 @@ import ( // //nolint:revive type S3Config struct { - S3Region string `conf:"s3_region" validate:"required"` - S3Bucket string `conf:"s3_bucket" validate:"required"` - S3BucketPrefix string `conf:"s3_bucket_prefix" validate:"dirpath"` - RoleArn string `conf:"role_arn" validate:"omitempty,startswith=arn:aws:iam"` - Id string `conf:"id" validate:"required"` - UseSingleKey bool `conf:"use_single_key" validate:"-"` - AllowMissingKey bool `conf:"allow_missing_key" validate:"-"` - SingleKey string `conf:"single_key" validate:"required_if=use_single_key true"` - UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"` - DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"` - UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"` - TimeZone string `conf:"time_zone" validate:"timezone"` + S3Region string `conf:"s3_region" validate:"required"` + S3Bucket string `conf:"s3_bucket" validate:"required"` + S3BucketPrefix string `conf:"s3_bucket_prefix" validate:"dirpath"` + RoleArn string `conf:"role_arn" validate:"omitempty,startswith=arn:aws:iam"` + Id string `conf:"id" validate:"required"` + UseSingleKey bool `conf:"use_single_key" validate:"-"` + AllowMissingKey bool `conf:"allow_missing_key" validate:"-"` + SingleKey string `conf:"single_key" validate:"required_if=use_single_key true"` + UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"` + DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"` + Timeout time.Duration `conf:"timeout" validate:"-"` + UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"` + TimeZone string `conf:"time_zone" validate:"timezone"` } // Generates configuration struct containing user-defined settings. In addition, sets default values @@ -46,6 +48,8 @@ type S3Config struct { // - S3Config: Configuration based on fluent-bit.conf // - err: All validation errors in config wrapped, parse bool error func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { + defaultTimeout, _ := time.ParseDuration("15m") + // Define default values for settings. Setting defaults before validation simplifies validation // configuration, and ensures that default settings are also validated. config := S3Config{ @@ -59,6 +63,7 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { SingleKey: "log", UseDiskBuffer: true, DiskBufferPath: "tmp/out_clp_s3/", + Timeout: defaultTimeout, UploadSizeMb: 16, TimeZone: "America/Toronto", } @@ -76,6 +81,7 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { "single_key": &config.SingleKey, "use_disk_buffer": &config.UseDiskBuffer, "disk_buffer_path": &config.DiskBufferPath, + "timeout": &config.Timeout, "upload_size_mb": &config.UploadSizeMb, "time_zone": &config.TimeZone, } @@ -90,7 +96,7 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { continue } - // Type switch to type parse boolean strings into boolean type. This is necessary since + // Type switch to type parse interface into field type. This is necessary since // all values are provided as strings. switch configField := untypedField.(type) { case *string: @@ -102,6 +108,12 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { return nil, fmt.Errorf("error could not parse input %v into bool", userInput) } *configField = boolInput + case *time.Duration: + durationInput, err := time.ParseDuration(userInput) + if err != nil { + return nil, fmt.Errorf("error could not parse input %v into duration", userInput) + } + *configField = durationInput case *int: intInput, err := strconv.Atoi(userInput) if err != nil { diff --git a/internal/outctx/context.go b/internal/outctx/context.go index 6f3eb67..caa45aa 100644 --- a/internal/outctx/context.go +++ b/internal/outctx/context.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "log" "path/filepath" "unsafe" @@ -42,7 +43,7 @@ const ( type S3Context struct { Config S3Config Uploader *manager.Uploader - EventManagers map[string]*EventManager + EventManagers map[string]*S3EventManager } // Creates a new context. Loads configuration from user. Loads and tests aws credentials. @@ -108,7 +109,7 @@ func NewS3Context(plugin unsafe.Pointer) (*S3Context, error) { ctx := S3Context{ Config: *config, Uploader: uploader, - EventManagers: make(map[string]*EventManager), + EventManagers: make(map[string]*S3EventManager), } return &ctx, nil @@ -123,7 +124,7 @@ func NewS3Context(plugin unsafe.Pointer) (*S3Context, error) { // // Returns: // - err: Could not create buffers or tag -func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, error) { +func (ctx *S3Context) GetEventManager(tag string, size int) (*S3EventManager, error) { var err error eventManager, ok := ctx.EventManagers[tag] @@ -137,7 +138,7 @@ func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, erro return eventManager, nil } -// Recovers [EventManager] from previous execution using existing disk buffers. +// Recovers [S3EventManager] from previous execution using existing disk buffers. // // Parameters: // - tag: Fluent Bit tag @@ -149,7 +150,7 @@ func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, erro func (ctx *S3Context) RecoverEventManager( tag string, size int, -) (*EventManager, error) { +) (*S3EventManager, error) { irPath, zstdPath := ctx.GetBufferFilePaths(tag) writer, err := irzstd.RecoverWriter( ctx.Config.TimeZone, @@ -161,17 +162,22 @@ func (ctx *S3Context) RecoverEventManager( return nil, err } - eventManager := EventManager{ - Tag: tag, - Writer: writer, + eventManager := S3EventManager{ + Tag: tag, + Writer: writer, + UploadRequests: make(chan bool), } + log.Printf("Starting upload listener for event manager with tag %s", tag) + eventManager.WaitGroup.Add(1) + go eventManager.listen(ctx.Config, ctx.Uploader) + ctx.EventManagers[tag] = &eventManager return &eventManager, nil } -// Creates a new [EventManager] with a new [irzstd.Writer]. If UseDiskBuffer is set, buffers are +// Creates a new [S3EventManager] with a new [irzstd.Writer]. If UseDiskBuffer is set, buffers are // created on disk and are used to buffer Fluent Bit chunks. If UseDiskBuffer is off, buffer is // in memory and chunks are not buffered. // @@ -185,7 +191,7 @@ func (ctx *S3Context) RecoverEventManager( func (ctx *S3Context) newEventManager( tag string, size int, -) (*EventManager, error) { +) (*S3EventManager, error) { var err error var writer irzstd.Writer @@ -197,6 +203,7 @@ func (ctx *S3Context) newEventManager( irPath, zstdPath, ) + } else { writer, err = irzstd.NewMemoryWriter(ctx.Config.TimeZone, size) } @@ -205,11 +212,16 @@ func (ctx *S3Context) newEventManager( return nil, err } - eventManager := EventManager{ - Tag: tag, - Writer: writer, + eventManager := S3EventManager{ + Tag: tag, + Writer: writer, + UploadRequests: make(chan bool), } + log.Printf("Starting upload listener for event manager with tag %s", tag) + eventManager.WaitGroup.Add(1) + go eventManager.listen(ctx.Config, ctx.Uploader) + ctx.EventManagers[tag] = &eventManager return &eventManager, nil diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index 0ecf85a..a94a3bb 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -6,6 +6,7 @@ import ( "log" "net/url" "path/filepath" + "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -19,29 +20,155 @@ import ( const s3TagKey = "fluentBitTag" // Resources and metadata to process Fluent Bit events with the same tag. -type EventManager struct { - Tag string - Index int - Writer irzstd.Writer +type S3EventManager struct { + Tag string + Index int + Writer irzstd.Writer + Mutex sync.Mutex + WaitGroup sync.WaitGroup + UploadRequests chan bool + Listening bool } -// Sends Zstd buffer to s3 and reset writer and buffers for future uploads. Prior to upload, -// IR buffer is flushed and IR/Zstd streams are terminated. The [EventManager.Index] is incremented -// on successful upload. +// Ends listener goroutine. +func (m *S3EventManager) StopListening() { + + if !m.Listening { + return + } + + log.Printf("Stopping upload listener for event manager with tag %s", m.Tag) + + // Closing the channel sends terminate signal to goroutine. The WaitGroup + // will block until it actually terminates. + close(m.UploadRequests) + m.WaitGroup.Wait() + m.Listening = false +} + +// Starts upload listener which can receive signals on UploadRequests channel. This function should +// be called as a goroutine. Timeout is only triggered if use_disk_buffer is on. Function calls +// immortal functions and thus will not exit immediately. Instead, it will only exit if the +// uploadRequest channel is closed which will allow immortal functions to break out of infinite +// loop. When function does exit, it decrements a WaitGroup letting event manager know it has +// exited. WaitGroup allows graceful exit of listener when Fluent Bit receives a kill signal. On +// [recovery.GracefulExit], plugin will wait to exit until all listeners are closed. Without +// WaitGroup, OS may abruptly kill listen goroutine. // // Parameters: // - config: Plugin configuration // - uploader: S3 uploader manager +func (m *S3EventManager) listen(config S3Config, uploader *manager.Uploader) { + defer m.WaitGroup.Done() + + m.Listening = true + if m.Writer.GetUseDiskBuffer() { + m.diskUploadListener(config, uploader) + } else { + m.memoryUploadListener(config, uploader) + } +} + +// Immortal listener that uploads events to s3 when receives signal on UploadRequests channel or a +// timeout is hit. Listener will sleep when inactive. // -// Returns: -// - err: Error creating closing streams, error uploading to s3, error resetting writer -func (m *EventManager) ToS3(config S3Config, uploader *manager.Uploader) error { - err := m.Writer.CloseStreams() +// Parameters: +// - config: Plugin configuration +// - uploader: S3 uploader manager +func (m *S3EventManager) diskUploadListener(config S3Config, uploader *manager.Uploader) { + for { + select { + case _, more := <-m.UploadRequests: + // Exit if channel is closed + if !more { + return + } + log.Printf("Listener with tag %s received upload request on channel", m.Tag) + // Timeout will reset if signal sent on UploadRequest channel + case <-time.After(config.Timeout): + log.Printf("Timeout surpassed for listener with tag %s", m.Tag) + } + + m.diskUpload(config, uploader) + } +} + +// Immortal listener that uploads events to s3 when receives signal on UploadRequests channel. +// Listener will sleep when inactive. +// +// Parameters: +// - config: Plugin configuration +// - uploader: S3 uploader manager +func (m *S3EventManager) memoryUploadListener(config S3Config, uploader *manager.Uploader) { + for { + _, more := <-m.UploadRequests + // Exit if channel is closed + if !more { + return + } + log.Printf("Listener with tag %s received upload request on channel", m.Tag) + m.memoryUpload(config, uploader) + } +} + +// Uploads to s3 after acquiring lock and validating that buffer is not empty. Mutex prevents +// write while uploading. Must check that buffer is not empty as timeout can trigger on empty +// buffer and send empty file to s3. Panics or logs instead of returning error. +// +// Parameters: +// - config: Plugin configuration +// - uploader: S3 uploader manager +func (m *S3EventManager) diskUpload(config S3Config, uploader *manager.Uploader) { + m.Mutex.Lock() + defer m.Mutex.Unlock() + + empty, err := m.Writer.CheckEmpty() if err != nil { - return fmt.Errorf("error closing irzstd stream: %w", err) + panic(fmt.Errorf("failed to check if buffer is empty, %w", err)) + } + + if empty { + log.Printf("Did not uploads events with tag %s since buffer is empty", m.Tag) + return } - outputLocation, err := uploadToS3( + m.toS3(config, uploader) +} + +// See [diskUpload]; however, not necessary to check size of buffer since there +// is no timeout. MemoryUpload cannot be called with empty buffer. Panics or logs +// instead of returning error. +// +// Parameters: +// - config: Plugin configuration +// - uploader: S3 uploader manager +func (m *S3EventManager) memoryUpload(config S3Config, uploader *manager.Uploader) { + m.Mutex.Lock() + defer m.Mutex.Unlock() + + m.toS3(config, uploader) +} + +// Sends Zstd buffer to s3 and reset writer and buffers for future uploads. Prior to upload, IR +// buffer is flushed and IR/Zstd streams are terminated. The [S3EventManager.Index] is incremented +// on successful upload. Logs errors with s3 request, otherwise panics instead on error. Errors +// closing and resetting writer are difficult to recover from. +// +// Parameters: +// - config: Plugin configuration +// - uploader: S3 uploader manager +func (m *S3EventManager) toS3(config S3Config, uploader *manager.Uploader) { + // In normal operation, writer GetClosed() should always return false. i.e. writer is open and + // the stream should be closed. However, if a s3 request fails, it is already closed. + // Therefore, on retry we don't want to close again. + if !m.Writer.GetClosed() { + err := m.Writer.CloseStreams() + if err != nil { + panic(fmt.Errorf("error closing irzstd stream: %w", err)) + } + } + + outputLocation, err := s3Request( config.S3Bucket, config.S3BucketPrefix, m, @@ -49,8 +176,8 @@ func (m *EventManager) ToS3(config S3Config, uploader *manager.Uploader) error { uploader, ) if err != nil { - err = fmt.Errorf("failed to upload chunk to s3, %w", err) - return err + log.Print(fmt.Errorf("S3 request failed for event manager with tag %s: %w", m.Tag, err)) + return } m.Index += 1 @@ -59,10 +186,8 @@ func (m *EventManager) ToS3(config S3Config, uploader *manager.Uploader) error { err = m.Writer.Reset() if err != nil { - return err + panic(fmt.Errorf("error resetting irzstd stream: %w", err)) } - - return nil } // Uploads log events to s3. @@ -76,10 +201,10 @@ func (m *EventManager) ToS3(config S3Config, uploader *manager.Uploader) error { // // Returns: // - err: Error uploading, error unescaping string -func uploadToS3( +func s3Request( bucket string, bucketPrefix string, - eventManager *EventManager, + eventManager *S3EventManager, id string, uploader *manager.Uploader, ) (string, error) { diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index e074ed0..8e5f7c0 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -82,20 +82,21 @@ More detailed information for specifying credentials from AWS can be found [here ### Plugin configuration -| Key | Description | Default | -|---------------------|----------------------------------------------------------------------------------------------------------|-------------------| -| `s3_region` | The AWS region of your S3 bucket | `us-east-1` | -| `s3_bucket` | S3 bucket name. Just the name, no aws prefix neccesary. | `None` | -| `s3_bucket_prefix` | Bucket prefix path | `logs/` | -| `role_arn` | ARN of an IAM role to assume | `None` | -| `id` | Name of output plugin | Random UUID | -| `use_single_key` | Output single key from Fluent Bit record. See [Use Single Key](#use-single-key) for more info. | `TRUE` | -| `allow_missing_key` | Fallback to whole record if key is missing from log. If set to false, an error will be recorded instead. | `TRUE` | -| `single_key` | Value for single key | `log` | -| `use_disk_buffer` | Buffer logs on disk prior to sending to S3. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | -| `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` | -| `upload_size_mb` | Set upload size in MB when disk store is enabled. Size refers to the compressed size. | `16` | -| `time_zone` | Time zone of the log source, so that local times (non-unix timestamps) are handled correctly. | `America/Toronto` | +| Key | Description | Default | +|---------------------|-----------------------------------------------------------------------------------------------------------------|---------------------| +| `s3_region` | The AWS region of your S3 bucket | `us-east-1` | +| `s3_bucket` | S3 bucket name. Just the name, no aws prefix necessary. | `None` | +| `s3_bucket_prefix` | Bucket prefix path | `logs/` | +| `role_arn` | ARN of an IAM role to assume | `None` | +| `id` | Name of output plugin | Random UUID | +| `use_single_key` | Output single key from Fluent Bit record. See [Use Single Key](#use-single-key) for more info. | `TRUE` | +| `allow_missing_key` | Fallback to whole record if single key is missing from log. If set to false, an error will be recorded instead. | `TRUE` | +| `single_key` | Value for single key | `log` | +| `use_disk_buffer` | Buffer logs on disk prior to sending to S3. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | +| `disk_buffer_path` | Directory for disk buffer. Path should be unique for each output. | `tmp/out_clp_s3/1/` | +| `upload_size_mb` | Set upload size in MB when disk buffer is enabled. Size refers to the compressed size. | `16` | +| `timeout` | Upload timeout if upload size is not met. For use when disk buffer is enabled. Valid time units are s, m, h. | `15m` | +| `time_zone` | Time zone of the log source, so that local times (non-unix timestamps) are handled correctly. | `America/Toronto` | #### Use Single Key @@ -107,12 +108,12 @@ the record as JSON. #### Disk Buffering -The output plugin recieves raw logs from Fluent Bit in small chunks. With `use_disk_buffer` set, the -output plugin will accumulate logs on disk until the upload size is reached. Buffering logs will -reduce the amount of S3 API requests and improve the compression ratio. However, the plugin will use -disk space and have higher memory requirements. The amount of system resources will be proportional -to the amount of Fluent Bit tags. With `use_disk_buffer` off, the plugin will immediately process -each chunk and send it to S3. +The output plugin receives raw logs from Fluent Bit in small chunks. With `use_disk_buffer` set, the +output plugin will accumulate logs on disk until the upload size or timeout is reached. Buffering +logs will reduce the amount of S3 API requests and improve the compression ratio. However, the plugin +will use disk space and have higher memory requirements. The amount of system resources will be +proportional to the amount of Fluent Bit tags. With `use_disk_buffer` off, the plugin will immediately +process each chunk and send it to S3. Logs are stored on the disk as IR and Zstd compressed IR. If the plugin were to crash, stored logs will be sent to S3 when Fluent Bit restarts. The upload index restarts on recovery. diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 714be4f..050d3cf 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -44,34 +44,11 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i return output.FLB_RETRY, fmt.Errorf("error getting event manager: %w", err) } - numEvents, err := eventManager.Writer.WriteIrZstd(logEvents) + err = write(eventManager, logEvents, ctx.Config) if err != nil { - log.Printf( - "Wrote %d out of %d total log events for tag %s", - numEvents, - len(logEvents), - eventManager.Tag, - ) return output.FLB_ERROR, err } - uploadCriteriaMet, err := checkUploadCriteriaMet( - eventManager, - ctx.Config.UploadSizeMb, - ) - if err != nil { - return output.FLB_ERROR, fmt.Errorf("error checking upload criteria: %w", err) - } - - if !uploadCriteriaMet { - return output.FLB_OK, nil - } - - err = eventManager.ToS3(ctx.Config, ctx.Uploader) - if err != nil { - return output.FLB_ERROR, fmt.Errorf("error flushing Zstd buffer to s3: %w", err) - } - return output.FLB_OK, nil } @@ -104,7 +81,7 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, } event := ffi.LogEvent{ - LogMessage: msg, + LogMessage: addSpaceAndNewLine(msg), Timestamp: ffi.EpochTimeMs(timestamp.UnixMilli()), } logEvents = append(logEvents, event) @@ -175,6 +152,51 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { return stringMsg, nil } +// Writes logEvents to event manager buffer. If upload criteria is met, sends upload signal to +// upload request channel. Method acquires lock to prevent upload while writing. +// +// Parameters: +// - eventManager: Manager for Fluent Bit events with the same tag +// - logEvents: Slice of log events +// - config: Plugin configuration +// +// Returns: +// - err: Error writing log events, error checking upload criteria +func write( + eventManager *outctx.S3EventManager, + logEvents []ffi.LogEvent, + config outctx.S3Config, +) error { + eventManager.Mutex.Lock() + defer eventManager.Mutex.Unlock() + + numEvents, err := eventManager.Writer.WriteIrZstd(logEvents) + if err != nil { + log.Printf( + "Wrote %d out of %d total log events for tag %s", + numEvents, + len(logEvents), + eventManager.Tag, + ) + return fmt.Errorf("error writing log events: %w", err) + } + + uploadCriteriaMet, err := checkUploadCriteriaMet( + eventManager, + config.UploadSizeMb, + ) + if err != nil { + return fmt.Errorf("error checking upload criteria: %w", err) + } + + if uploadCriteriaMet { + log.Printf("Sending upload request to channel with tag %s", eventManager.Tag) + eventManager.UploadRequests <- true + } + + return nil +} + // Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always // uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater // than upload size. @@ -186,7 +208,7 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { // Returns: // - readyToUpload: Boolean if upload criteria met or not // - err: Error getting Zstd buffer size -func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) { +func checkUploadCriteriaMet(eventManager *outctx.S3EventManager, uploadSizeMb int) (bool, error) { if !eventManager.Writer.GetUseDiskBuffer() { return true, nil } @@ -210,3 +232,16 @@ func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) return false, nil } + +// Decompressed IR streams appear as one concatenated string. Adding a space separates the log +// from the timestamp. Adding a new line separates logs from each other. +// +// Parameters: +// - msg: Log event message +// +// Returns: +// - modifiedMsg: Message with space at beginning and newline at end +func addSpaceAndNewLine(msg string) string { + modifiedMsg := " " + msg + "\n" + return modifiedMsg +} diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index 2683506..e507cb5 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -25,6 +25,7 @@ import ( // - err: Error closing file func GracefulExit(ctx *outctx.S3Context) error { for _, eventManager := range ctx.EventManagers { + eventManager.StopListening() err := eventManager.Writer.Close() if err != nil { return err @@ -174,7 +175,7 @@ func checkFilesValid(irFiles map[string]fs.FileInfo, zstdFiles map[string]fs.Fil } // Flushes existing disk buffer to s3 on startup. Prior to sending, opens disk buffer files and -// creates new [outctx.EventManager] using existing buffer files. +// creates new [outctx.S3EventManager] using existing buffer files. // // Parameters: // - tag: Fluent Bit tag @@ -210,14 +211,9 @@ func flushExistingBuffer( if err != nil { return fmt.Errorf("error recovering event manager with tag: %w", err) } - log.Printf("Recovered disk buffers with tag %s", tag) - - err = eventManager.ToS3(ctx.Config, ctx.Uploader) - if err != nil { - return fmt.Errorf("error flushing Zstd to s3: %w", err) - } - + log.Printf("Sending upload request to channel with tag %s", eventManager.Tag) + eventManager.UploadRequests <- true return nil } diff --git a/plugins/out_clp_s3/out_clp_s3.go b/plugins/out_clp_s3/out_clp_s3.go index c0591df..758c5f4 100644 --- a/plugins/out_clp_s3/out_clp_s3.go +++ b/plugins/out_clp_s3/out_clp_s3.go @@ -10,6 +10,7 @@ package main import ( "C" + "fmt" "log" "unsafe" @@ -32,7 +33,10 @@ const s3PluginName = "out_clp_s3" // //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { - log.Printf("[%s] Register called", s3PluginName) + logPrefix := fmt.Sprintf("[%s] ", s3PluginName) + log.SetPrefix(logPrefix) + log.SetFlags(log.LstdFlags | log.Lmsgprefix) + log.Printf("Register called") return output.FLBPluginRegister(def, s3PluginName, "CLP s3 plugin") } @@ -51,7 +55,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { log.Fatalf("Failed to initialize plugin: %s", err) } - log.Printf("[%s] Init called for id: %s", s3PluginName, outCtx.Config.Id) + log.Printf("Init called for id: %s", outCtx.Config.Id) if outCtx.Config.UseDiskBuffer { err = recovery.RecoverBufferFiles(outCtx) @@ -89,8 +93,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int stringTag := C.GoString(tag) log.Printf( - "[%s] Flush called for id %s with tag %s and size %d", - s3PluginName, + "Flush called for id %s with tag %s and size %d", outCtx.Config.Id, stringTag, size, @@ -108,7 +111,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int //export FLBPluginExit func FLBPluginExit() int { - log.Printf("[%s] Exit called for unknown instance", s3PluginName) + log.Printf("Exit called for unknown instance") return output.FLB_OK } @@ -130,7 +133,7 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int { log.Fatal("Could not read context during flush") } - log.Printf("[%s] Exit called for id: %s", s3PluginName, outCtx.Config.Id) + log.Printf("Exit called for id: %s", outCtx.Config.Id) err := recovery.GracefulExit(outCtx) if err != nil { @@ -142,7 +145,7 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int { //export FLBPluginUnregister func FLBPluginUnregister(def unsafe.Pointer) { - log.Printf("[%s] Unregister called", s3PluginName) + log.Printf("Unregister called") output.FLBPluginUnregister(def) }