From 0faed884e57c40b6bc19bd04355fecfc725fc60f Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 29 Aug 2024 14:22:45 -0700 Subject: [PATCH] Add FileSinkCommitStrategy This hopefully clears up, at least for the use of the FileSinkWriteExt trait the relationship between `auto_commit` and `roll_time`. There is no default implementaion for this option, as all options have implications that should be considered. --- file_store/src/file_sink.rs | 7 +++++ file_store/src/traits/file_sink_write.rs | 39 ++++++++++++++++++------ file_store/src/traits/mod.rs | 2 +- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 81548daa1..771754e90 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -253,12 +253,19 @@ pub struct FileSink { target_path: PathBuf, tmp_path: PathBuf, prefix: String, + /// Maximum file size in bytes. If a single write would cause this limit to + /// be exceeded, the `active_sink` is rolled. max_size: usize, + /// Window within which writes can occur to `active_sink`. `roll_time` is + /// not checked during writing, so a file may contain items exceeding the + /// window of `roll_time`. roll_time: Duration, messages: MessageReceiver, file_upload: FileUpload, staged_files: Vec, + /// 'commit' the file to s3 automatically when either the `roll_time` is + /// surpassed, or `max_size` would be exceeded by an incoming message. auto_commit: bool, active_sink: Option, diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index 6aa8ea861..e80b80c64 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -14,6 +14,22 @@ use helium_proto::{ pub const DEFAULT_ROLL_TIME: Duration = Duration::from_secs(DEFAULT_SINK_ROLL_SECS); +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum FileSinkCommitStrategy { + /// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded. + /// Files will be collected into tmp storage on `DEFAULT_ROLL_TIME` basis. + Manual, + /// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded. + /// Files will be collected into tmp storage on provided `roll_time` basis. + ManualRollTime(Duration), + /// Files will be automatically uploaded when + /// [`FileSinkBuilder::max_size()`] is exceeded, or [`DEFAULT_ROLL_TIME`] has elapsed. + Automatic, + /// Files will be automatically uploaded when + /// [`FileSinkBuilder::max_size()`] is exceeded, or provided `roll_time` has elapsed. + AutomaticRollTime(Duration), +} + #[async_trait::async_trait] pub trait FileSinkWriteExt where @@ -22,14 +38,10 @@ where const FILE_PREFIX: &'static str; const METRIC_SUFFIX: &'static str; - // The `auto_commit` option and `roll_time` option are incompatible with - // each other. It doesn't make sense to roll a file every so often _and_ - // commit it every time something is written. If a roll_time is provided, - // `auto_commit` is set to false. async fn file_sink( target_path: &Path, file_upload: FileUpload, - roll_time: Option, + commit_strategy: FileSinkCommitStrategy, metric_prefix: &str, ) -> Result<(FileSinkClient, FileSink)> { let builder = FileSinkBuilder::new( @@ -39,10 +51,19 @@ where format!("{}_{}", metric_prefix, Self::METRIC_SUFFIX), ); - let builder = if let Some(duration) = roll_time { - builder.auto_commit(false).roll_time(duration) - } else { - builder.auto_commit(true) + let builder = match commit_strategy { + FileSinkCommitStrategy::Manual => { + builder.auto_commit(false).roll_time(DEFAULT_ROLL_TIME) + } + FileSinkCommitStrategy::ManualRollTime(roll_time) => { + builder.auto_commit(false).roll_time(roll_time) + } + FileSinkCommitStrategy::Automatic => { + builder.auto_commit(true).roll_time(DEFAULT_ROLL_TIME) + } + FileSinkCommitStrategy::AutomaticRollTime(roll_time) => { + builder.auto_commit(true).roll_time(roll_time) + } }; let file_sink = builder.create().await?; diff --git a/file_store/src/traits/mod.rs b/file_store/src/traits/mod.rs index ae37821c8..976b8cec5 100644 --- a/file_store/src/traits/mod.rs +++ b/file_store/src/traits/mod.rs @@ -5,7 +5,7 @@ mod msg_timestamp; mod msg_verify; mod report_id; -pub use file_sink_write::{FileSinkWriteExt, DEFAULT_ROLL_TIME}; +pub use file_sink_write::{FileSinkCommitStrategy, FileSinkWriteExt, DEFAULT_ROLL_TIME}; pub use msg_bytes::MsgBytes; pub use msg_decode::MsgDecode; pub use msg_timestamp::{MsgTimestamp, TimestampDecode, TimestampEncode};