Skip to content

Commit

Permalink
Add FileSinkCommitStrategy
Browse files Browse the repository at this point in the history
This hopefully clears up, at least for the use of the FileSinkWriteExt
trait the relationship between `auto_commit` and `roll_time`. There is
no default implementaion for this option, as all options have
implications that should be considered.
  • Loading branch information
michaeldjeffrey committed Aug 29, 2024
1 parent e6adffc commit 0faed88
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
7 changes: 7 additions & 0 deletions file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,19 @@ pub struct FileSink<T> {
target_path: PathBuf,
tmp_path: PathBuf,
prefix: String,
/// Maximum file size in bytes. If a single write would cause this limit to
/// be exceeded, the `active_sink` is rolled.
max_size: usize,
/// Window within which writes can occur to `active_sink`. `roll_time` is
/// not checked during writing, so a file may contain items exceeding the
/// window of `roll_time`.
roll_time: Duration,

messages: MessageReceiver<T>,
file_upload: FileUpload,
staged_files: Vec<PathBuf>,
/// 'commit' the file to s3 automatically when either the `roll_time` is
/// surpassed, or `max_size` would be exceeded by an incoming message.
auto_commit: bool,

active_sink: Option<ActiveSink>,
Expand Down
39 changes: 30 additions & 9 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ use helium_proto::{

pub const DEFAULT_ROLL_TIME: Duration = Duration::from_secs(DEFAULT_SINK_ROLL_SECS);

#[derive(Copy, Clone, PartialEq, Eq)]
pub enum FileSinkCommitStrategy {
/// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded.
/// Files will be collected into tmp storage on `DEFAULT_ROLL_TIME` basis.
Manual,
/// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded.
/// Files will be collected into tmp storage on provided `roll_time` basis.
ManualRollTime(Duration),
/// Files will be automatically uploaded when
/// [`FileSinkBuilder::max_size()`] is exceeded, or [`DEFAULT_ROLL_TIME`] has elapsed.
Automatic,
/// Files will be automatically uploaded when
/// [`FileSinkBuilder::max_size()`] is exceeded, or provided `roll_time` has elapsed.
AutomaticRollTime(Duration),
}

#[async_trait::async_trait]
pub trait FileSinkWriteExt
where
Expand All @@ -22,14 +38,10 @@ where
const FILE_PREFIX: &'static str;
const METRIC_SUFFIX: &'static str;

// The `auto_commit` option and `roll_time` option are incompatible with
// each other. It doesn't make sense to roll a file every so often _and_
// commit it every time something is written. If a roll_time is provided,
// `auto_commit` is set to false.
async fn file_sink(
target_path: &Path,
file_upload: FileUpload,
roll_time: Option<Duration>,
commit_strategy: FileSinkCommitStrategy,
metric_prefix: &str,
) -> Result<(FileSinkClient<Self>, FileSink<Self>)> {
let builder = FileSinkBuilder::new(
Expand All @@ -39,10 +51,19 @@ where
format!("{}_{}", metric_prefix, Self::METRIC_SUFFIX),
);

let builder = if let Some(duration) = roll_time {
builder.auto_commit(false).roll_time(duration)
} else {
builder.auto_commit(true)
let builder = match commit_strategy {
FileSinkCommitStrategy::Manual => {
builder.auto_commit(false).roll_time(DEFAULT_ROLL_TIME)
}
FileSinkCommitStrategy::ManualRollTime(roll_time) => {
builder.auto_commit(false).roll_time(roll_time)
}
FileSinkCommitStrategy::Automatic => {
builder.auto_commit(true).roll_time(DEFAULT_ROLL_TIME)
}
FileSinkCommitStrategy::AutomaticRollTime(roll_time) => {
builder.auto_commit(true).roll_time(roll_time)
}
};

let file_sink = builder.create().await?;
Expand Down
2 changes: 1 addition & 1 deletion file_store/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod msg_timestamp;
mod msg_verify;
mod report_id;

pub use file_sink_write::{FileSinkWriteExt, DEFAULT_ROLL_TIME};
pub use file_sink_write::{FileSinkCommitStrategy, FileSinkWriteExt, DEFAULT_ROLL_TIME};
pub use msg_bytes::MsgBytes;
pub use msg_decode::MsgDecode;
pub use msg_timestamp::{MsgTimestamp, TimestampDecode, TimestampEncode};
Expand Down

0 comments on commit 0faed88

Please sign in to comment.