diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 43e2ecc..e4be566 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -260,14 +260,6 @@ func (w *diskWriter) Close() error { return nil } -// Getter for useDiskBuffer. -// -// Returns: -// - useDiskBuffer: On/off for disk buffering -func (w *diskWriter) GetUseDiskBuffer() bool { - return true -} - // Getter for Zstd Output. // // Returns: @@ -284,6 +276,21 @@ func (w *diskWriter) GetZstdOutputSize() (int, error) { return w.getZstdFileSize() } +// Checks if writer is empty. True if no events are buffered. +// +// Returns: +// - empty: Boolean value that is true if buffer is empty +// - err: Error calling stat +func (w *diskWriter) Empty() (bool, error) { + zstdFileInfo, err := w.zstdFile.Stat() + if err != nil { + return false, err + } + + empty := (zstdFileInfo.Size() == 0) && (w.irTotalBytes == 0) + return empty, nil +} + // Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then // truncated. // diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 6e86c7b..1f444fd 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -11,13 +11,13 @@ import ( "github.com/y-scope/clp-ffi-go/ir" ) -// Converts log events into Zstd compressed IR. Log events provided to writer are immediately -// converted to Zstd compressed IR and stored in [memoryWriter.ZstdBuffer]. After the Zstd buffer -// receives logs, they are immediately sent to s3. +// Converts log events into Zstd compressed IR. Log events are immediately converted to Zstd +// compressed IR and stored in [memoryWriter.zstdBuffer]. type memoryWriter struct { - zstdBuffer *bytes.Buffer - irWriter *ir.Writer - zstdWriter *zstd.Encoder + zstdBuffer *bytes.Buffer + irWriter *ir.Writer + zstdWriter *zstd.Encoder + irTotalBytes int } // Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is @@ -57,7 +57,8 @@ func NewMemoryWriter() (*memoryWriter, error) { // - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { - _, numEvents, err := writeIr(w.irWriter, logEvents) + numBytes, numEvents, err := writeIr(w.irWriter, logEvents) + w.irTotalBytes += numBytes if err != nil { return numEvents, err } @@ -87,6 +88,7 @@ func (w *memoryWriter) Reset() error { var err error w.zstdBuffer.Reset() w.zstdWriter.Reset(w.zstdBuffer) + w.irTotalBytes = 0 w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter) if err != nil { @@ -96,14 +98,6 @@ func (w *memoryWriter) Reset() error { return nil } -// Getter for useDiskBuffer. -// -// Returns: -// - useDiskBuffer: On/off for disk buffering -func (w *memoryWriter) GetUseDiskBuffer() bool { - return false -} - // Getter for Zstd Output. // // Returns: @@ -113,7 +107,8 @@ func (w *memoryWriter) GetZstdOutput() io.Reader { } // Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. -// Instead, calling Len() on buffer. +// Instead, calling Len() on buffer. Size may slightly lag the real size since some data in the +// current block will be in the [zstd] encoder's internal buffer. // // Returns: // - size: Bytes written @@ -139,3 +134,12 @@ func (w *memoryWriter) Close() error { } return nil } + +// Checks if writer is empty. True if no events are buffered. +// +// Returns: +// - empty: Boolean value that is true if buffer is empty +// - err: nil error to comply with interface +func (w *memoryWriter) Empty() (bool, error) { + return w.irTotalBytes == 0, nil +} diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index a38ab45..7b6632e 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -43,12 +43,6 @@ type Writer interface { // - err Reset() error - // Getter for useDiskBuffer. - // - // Returns: - // - useDiskBuffer: On/off for disk buffering - GetUseDiskBuffer() bool - // Getter for Zstd Output. // // Returns: @@ -61,6 +55,13 @@ type Writer interface { // - size: Bytes written // - err GetZstdOutputSize() (int, error) + + // Checks if writer is empty. True if no events are buffered. + // + // Returns: + // - empty: Boolean value that is true if buffer is empty + // - err + Empty() (bool, error) } // Writes log events to a IR Writer. diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index f3dbf59..63e4a7d 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -97,21 +97,23 @@ More detailed information for specifying credentials from AWS can be found [here | `s3_bucket_prefix` | Bucket prefix path | `logs/` | | `role_arn` | ARN of an IAM role to assume | `None` | | `id` | Name of output plugin | Random UUID | -| `use_disk_buffer` | Buffer logs on disk prior to sending to S3. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | -| `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` | -| `upload_size_mb` | Set upload size in MB when disk store is enabled. Size refers to the compressed size. | `16` | +| `use_disk_buffer` | Buffer logs on disk. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | +| `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` | +| `upload_size_mb` | Set upload size in MB. Size refers to the compressed size. | `16` | #### Disk Buffering -The output plugin recieves raw logs from Fluent Bit in small chunks. With `use_disk_buffer` set, the -output plugin will accumulate logs on disk until the upload size is reached. Buffering logs will -reduce the amount of S3 API requests and improve the compression ratio. However, the plugin will use -disk space and have higher memory requirements. The amount of system resources will be proportional -to the amount of Fluent Bit tags. With `use_disk_buffer` off, the plugin will immediately process -each chunk and send it to S3. +The output plugin recieves raw logs from Fluent Bit in small chunks and accumulates them in a compressed +buffer until the upload size is reached before sending to S3. -Logs are stored on the disk as IR and Zstd compressed IR. If the plugin were to crash, stored logs -will be sent to S3 when Fluent Bit restarts. The upload index restarts on recovery. +With `use_disk_buffer` set, logs are stored on disk as IR and Zstd compressed IR. On a graceful shutdown +or abrupt crash, stored logs will be sent to S3 when Fluent Bit restarts. For an abrupt crash, there is +a very small chance of data corruption if the plugin crashed mid write. The upload index restarts on +recovery. + +With `use_disk_buffer` off, logs are stored in memory as Zstd compressed IR. On a graceful shutdown, the +plugin will attempt to upload any buffered data to S3 before Fluent Bit terminates it. On an abrupt +crash, in-memory data is lost. ### S3 Objects diff --git a/plugins/out_clp_s3/internal/exit/exit.go b/plugins/out_clp_s3/internal/exit/exit.go new file mode 100644 index 0000000..4779104 --- /dev/null +++ b/plugins/out_clp_s3/internal/exit/exit.go @@ -0,0 +1,59 @@ +// Package exit provides functions for gracefully shutting down the plugin. Exit functions are only +// called when Fluent Bit receives a kill signal, not during an abrupt crash. The plugin is given +// limited time to clean up resources before Fluent Bit terminates it. + +package exit + +import ( + "github.com/y-scope/fluent-bit-clp/internal/outctx" +) + +// NoUpload gracefully exits the plugin by closing writers without uploading. +// +// Parameters: +// - ctx: Plugin context +// +// Returns: +// - err: Error closing file +func NoUpload(ctx *outctx.S3Context) error { + for _, eventManager := range ctx.EventManagers { + err := eventManager.Writer.Close() + if err != nil { + return err + } + eventManager.Writer = nil + } + + return nil +} + +// S3 gracefully exits the plugin by flushing buffered data to S3. Makes a best-effort attempt, +// however Fluent Bit may kill the plugin before the upload completes. +// +// Parameters: +// - ctx: Plugin context +// +// Returns: +// - err: Error closing file +func S3(ctx *outctx.S3Context) error { + for _, eventManager := range ctx.EventManagers { + empty, err := eventManager.Writer.Empty() + if err != nil { + return err + } + if empty { + continue + } + err = eventManager.ToS3(ctx.Config, ctx.Uploader) + if err != nil { + return err + } + err = eventManager.Writer.Close() + if err != nil { + return err + } + eventManager.Writer = nil + } + + return nil +} diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 72fc45f..df8d58c 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -111,9 +111,7 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, } } -// Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always -// uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater -// than upload size. +// Checks whether Zstd buffer size is greater than or equal to upload size. // // Parameters: // - eventManager: Manager for Fluent Bit events with the same tag @@ -123,10 +121,6 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, // - readyToUpload: Boolean if upload criteria met or not // - err: Error getting Zstd buffer size func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) { - if !eventManager.Writer.GetUseDiskBuffer() { - return true, nil - } - bufferSize, err := eventManager.Writer.GetZstdOutputSize() if err != nil { return false, fmt.Errorf("error could not get size of buffer: %w", err) diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index e6b09dc..c8a1507 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -13,28 +13,6 @@ import ( "github.com/y-scope/fluent-bit-clp/internal/outctx" ) -// If useDiskBuffer is set, close all files prior to exit. Graceful exit will only be called -// if Fluent Bit receives a kill signal and not during an abrupt crash. Plugin is only -// given a limited time to clean up resources, so output is not sent to s3. Instead -// they are sent during startup. -// -// Parameters: -// - ctx: Plugin context -// -// Returns: -// - err: Error closing file -func GracefulExit(ctx *outctx.S3Context) error { - for _, eventManager := range ctx.EventManagers { - err := eventManager.Writer.Close() - if err != nil { - return err - } - eventManager.Writer = nil - } - - return nil -} - // Sends existing disk buffers to S3. // // Parameters: diff --git a/plugins/out_clp_s3/out_clp_s3.go b/plugins/out_clp_s3/out_clp_s3.go index c0591df..ed6f3ef 100644 --- a/plugins/out_clp_s3/out_clp_s3.go +++ b/plugins/out_clp_s3/out_clp_s3.go @@ -16,6 +16,7 @@ import ( "github.com/fluent/fluent-bit-go/output" "github.com/y-scope/fluent-bit-clp/internal/outctx" + "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/exit" "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/flush" "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/recovery" ) @@ -132,7 +133,12 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int { log.Printf("[%s] Exit called for id: %s", s3PluginName, outCtx.Config.Id) - err := recovery.GracefulExit(outCtx) + var err error + if outCtx.Config.UseDiskBuffer { + err = exit.NoUpload(outCtx) + } else { + err = exit.S3(outCtx) + } if err != nil { log.Printf("Failed to exit gracefully") }