Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions internal/irzstd/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,6 @@ func (w *diskWriter) Close() error {
return nil
}

// Getter for useDiskBuffer.
//
// Returns:
// - useDiskBuffer: On/off for disk buffering
func (w *diskWriter) GetUseDiskBuffer() bool {
return true
}

// Getter for Zstd Output.
//
// Returns:
Expand All @@ -284,6 +276,21 @@ func (w *diskWriter) GetZstdOutputSize() (int, error) {
return w.getZstdFileSize()
}

// Checks if writer is empty. True if no events are buffered.
//
// Returns:
// - empty: Boolean value that is true if buffer is empty
// - err: Error calling stat
func (w *diskWriter) Empty() (bool, error) {
zstdFileInfo, err := w.zstdFile.Stat()
if err != nil {
return false, err
}

empty := (zstdFileInfo.Size() == 0) && (w.irTotalBytes == 0)
return empty, nil
}

// Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then
// truncated.
//
Expand Down
36 changes: 20 additions & 16 deletions internal/irzstd/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"github.com/y-scope/clp-ffi-go/ir"
)

// Converts log events into Zstd compressed IR. Log events provided to writer are immediately
// converted to Zstd compressed IR and stored in [memoryWriter.ZstdBuffer]. After the Zstd buffer
// receives logs, they are immediately sent to s3.
// Converts log events into Zstd compressed IR. Log events are immediately converted to Zstd
// compressed IR and stored in [memoryWriter.zstdBuffer].
type memoryWriter struct {
zstdBuffer *bytes.Buffer
irWriter *ir.Writer
zstdWriter *zstd.Encoder
zstdBuffer *bytes.Buffer
irWriter *ir.Writer
zstdWriter *zstd.Encoder
irTotalBytes int
}

// Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is
Expand Down Expand Up @@ -57,7 +57,8 @@ func NewMemoryWriter() (*memoryWriter, error) {
// - numEvents: Number of log events successfully written to IR writer buffer
// - err: Error writing IR/Zstd
func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
_, numEvents, err := writeIr(w.irWriter, logEvents)
numBytes, numEvents, err := writeIr(w.irWriter, logEvents)
w.irTotalBytes += numBytes
if err != nil {
return numEvents, err
}
Expand Down Expand Up @@ -87,6 +88,7 @@ func (w *memoryWriter) Reset() error {
var err error
w.zstdBuffer.Reset()
w.zstdWriter.Reset(w.zstdBuffer)
w.irTotalBytes = 0

w.irWriter, err = ir.NewWriter[ir.FourByteEncoding](w.zstdWriter)
if err != nil {
Expand All @@ -96,14 +98,6 @@ func (w *memoryWriter) Reset() error {
return nil
}

// Getter for useDiskBuffer.
//
// Returns:
// - useDiskBuffer: On/off for disk buffering
func (w *memoryWriter) GetUseDiskBuffer() bool {
return false
}

// Getter for Zstd Output.
//
// Returns:
Expand All @@ -113,7 +107,8 @@ func (w *memoryWriter) GetZstdOutput() io.Reader {
}

// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write.
// Instead, calling Len() on buffer.
// Instead, calling Len() on buffer. Size may slightly lag the real size since some data in the
// current block will be in the [zstd] encoder's internal buffer.
//
// Returns:
// - size: Bytes written
Expand All @@ -139,3 +134,12 @@ func (w *memoryWriter) Close() error {
}
return nil
}

// Checks if writer is empty. True if no events are buffered.
//
// Returns:
// - empty: Boolean value that is true if buffer is empty
// - err: nil error to comply with interface
func (w *memoryWriter) Empty() (bool, error) {
return w.irTotalBytes == 0, nil
}
13 changes: 7 additions & 6 deletions internal/irzstd/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ type Writer interface {
// - err
Reset() error

// Getter for useDiskBuffer.
//
// Returns:
// - useDiskBuffer: On/off for disk buffering
GetUseDiskBuffer() bool

// Getter for Zstd Output.
//
// Returns:
Expand All @@ -61,6 +55,13 @@ type Writer interface {
// - size: Bytes written
// - err
GetZstdOutputSize() (int, error)

// Checks if writer is empty. True if no events are buffered.
//
// Returns:
// - empty: Boolean value that is true if buffer is empty
// - err
Empty() (bool, error)
}

// Writes log events to a IR Writer.
Expand Down
24 changes: 13 additions & 11 deletions plugins/out_clp_s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,23 @@ More detailed information for specifying credentials from AWS can be found [here
| `s3_bucket_prefix` | Bucket prefix path | `logs/` |
| `role_arn` | ARN of an IAM role to assume | `None` |
| `id` | Name of output plugin | Random UUID |
| `use_disk_buffer` | Buffer logs on disk prior to sending to S3. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` |
| `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` |
| `upload_size_mb` | Set upload size in MB when disk store is enabled. Size refers to the compressed size. | `16` |
| `use_disk_buffer` | Buffer logs on disk. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` |
| `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` |
| `upload_size_mb` | Set upload size in MB. Size refers to the compressed size. | `16` |

#### Disk Buffering

The output plugin recieves raw logs from Fluent Bit in small chunks. With `use_disk_buffer` set, the
output plugin will accumulate logs on disk until the upload size is reached. Buffering logs will
reduce the amount of S3 API requests and improve the compression ratio. However, the plugin will use
disk space and have higher memory requirements. The amount of system resources will be proportional
to the amount of Fluent Bit tags. With `use_disk_buffer` off, the plugin will immediately process
each chunk and send it to S3.
The output plugin recieves raw logs from Fluent Bit in small chunks and accumulates them in a compressed
buffer until the upload size is reached before sending to S3.

Logs are stored on the disk as IR and Zstd compressed IR. If the plugin were to crash, stored logs
will be sent to S3 when Fluent Bit restarts. The upload index restarts on recovery.
With `use_disk_buffer` set, logs are stored on disk as IR and Zstd compressed IR. On a graceful shutdown
or abrupt crash, stored logs will be sent to S3 when Fluent Bit restarts. For an abrupt crash, there is
a very small chance of data corruption if the plugin crashed mid write. The upload index restarts on
recovery.

With `use_disk_buffer` off, logs are stored in memory as Zstd compressed IR. On a graceful shutdown, the
plugin will attempt to upload any buffered data to S3 before Fluent Bit terminates it. On an abrupt
crash, in-memory data is lost.

### S3 Objects

Expand Down
59 changes: 59 additions & 0 deletions plugins/out_clp_s3/internal/exit/exit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Package exit provides functions for gracefully shutting down the plugin. Exit functions are only
// called when Fluent Bit receives a kill signal, not during an abrupt crash. The plugin is given
// limited time to clean up resources before Fluent Bit terminates it.

package exit

import (
"github.com/y-scope/fluent-bit-clp/internal/outctx"
)

// NoUpload gracefully exits the plugin by closing writers without uploading.
//
// Parameters:
// - ctx: Plugin context
//
// Returns:
// - err: Error closing file
func NoUpload(ctx *outctx.S3Context) error {
for _, eventManager := range ctx.EventManagers {
err := eventManager.Writer.Close()
if err != nil {
return err
}
eventManager.Writer = nil
}

return nil
}

// S3 gracefully exits the plugin by flushing buffered data to S3. Makes a best-effort attempt,
// however Fluent Bit may kill the plugin before the upload completes.
//
// Parameters:
// - ctx: Plugin context
//
// Returns:
// - err: Error closing file
func S3(ctx *outctx.S3Context) error {
for _, eventManager := range ctx.EventManagers {
empty, err := eventManager.Writer.Empty()
if err != nil {
return err
}
if empty {
continue
}
err = eventManager.ToS3(ctx.Config, ctx.Uploader)
if err != nil {
return err
}
err = eventManager.Writer.Close()
if err != nil {
return err
}
eventManager.Writer = nil
}

return nil
}
8 changes: 1 addition & 7 deletions plugins/out_clp_s3/internal/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent,
}
}

// Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always
// uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater
// than upload size.
// Checks whether Zstd buffer size is greater than or equal to upload size.
//
// Parameters:
// - eventManager: Manager for Fluent Bit events with the same tag
Expand All @@ -123,10 +121,6 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent,
// - readyToUpload: Boolean if upload criteria met or not
// - err: Error getting Zstd buffer size
func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) {
if !eventManager.Writer.GetUseDiskBuffer() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one change changes buffer behaviour to not immediately upload

return true, nil
}

bufferSize, err := eventManager.Writer.GetZstdOutputSize()
if err != nil {
return false, fmt.Errorf("error could not get size of buffer: %w", err)
Expand Down
22 changes: 0 additions & 22 deletions plugins/out_clp_s3/internal/recovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,6 @@ import (
"github.com/y-scope/fluent-bit-clp/internal/outctx"
)

// If useDiskBuffer is set, close all files prior to exit. Graceful exit will only be called
// if Fluent Bit receives a kill signal and not during an abrupt crash. Plugin is only
// given a limited time to clean up resources, so output is not sent to s3. Instead
// they are sent during startup.
//
// Parameters:
// - ctx: Plugin context
//
// Returns:
// - err: Error closing file
func GracefulExit(ctx *outctx.S3Context) error {
for _, eventManager := range ctx.EventManagers {
err := eventManager.Writer.Close()
if err != nil {
return err
}
eventManager.Writer = nil
}

return nil
}

// Sends existing disk buffers to S3.
//
// Parameters:
Expand Down
8 changes: 7 additions & 1 deletion plugins/out_clp_s3/out_clp_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/fluent/fluent-bit-go/output"

"github.com/y-scope/fluent-bit-clp/internal/outctx"
"github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/exit"
"github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/flush"
"github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/internal/recovery"
)
Expand Down Expand Up @@ -132,7 +133,12 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int {

log.Printf("[%s] Exit called for id: %s", s3PluginName, outCtx.Config.Id)

err := recovery.GracefulExit(outCtx)
var err error
if outCtx.Config.UseDiskBuffer {
err = exit.NoUpload(outCtx)
} else {
err = exit.S3(outCtx)
}
if err != nil {
log.Printf("Failed to exit gracefully")
}
Expand Down