Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix roll_time and auto_commit settings for FileSink #859

Merged
merged 3 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use boost_manager::{
};
use clap::Parser;
use file_store::{
file_info_poller::LookbackBehavior, file_source, file_upload, reward_manifest::RewardManifest,
traits::FileSinkWriteExt, FileStore, FileType,
file_info_poller::LookbackBehavior,
file_source, file_upload,
reward_manifest::RewardManifest,
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt},
FileStore, FileType,
};
use helium_proto::BoostedHexUpdateV1;
use mobile_config::client::hex_boosting_client::HexBoostingClient;
Expand Down Expand Up @@ -103,7 +106,8 @@ impl Server {
let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
7 changes: 7 additions & 0 deletions file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,19 @@ pub struct FileSink<T> {
target_path: PathBuf,
tmp_path: PathBuf,
prefix: String,
/// Maximum file size in bytes. If a single write would cause this limit to
/// be exceeded, the `active_sink` is rolled.
max_size: usize,
/// Window within which writes can occur to `active_sink`. `roll_time` is
/// not checked during writing, so a file may contain items exceeding the
/// window of `roll_time`.
roll_time: Duration,

messages: MessageReceiver<T>,
file_upload: FileUpload,
staged_files: Vec<PathBuf>,
/// 'commit' the file to s3 automatically when either the `roll_time` is
/// surpassed, or `max_size` would be exceeded by an incoming message.
auto_commit: bool,

active_sink: Option<ActiveSink>,
Expand Down
39 changes: 30 additions & 9 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ use helium_proto::{

pub const DEFAULT_ROLL_TIME: Duration = Duration::from_secs(DEFAULT_SINK_ROLL_SECS);

#[derive(Copy, Clone, PartialEq, Eq)]
pub enum FileSinkCommitStrategy {
michaeldjeffrey marked this conversation as resolved.
Show resolved Hide resolved
michaeldjeffrey marked this conversation as resolved.
Show resolved Hide resolved
/// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded.
Manual,
/// Files will be automatically uploaded when
/// [`FileSinkBuilder::max_size()`] is exceeded, or [`DEFAULT_ROLL_TIME`] has elapsed.
Automatic,
}

#[derive(Copy, Clone, PartialEq, Eq)]
pub enum FileSinkRollTime {
/// Default is 3 minutes
Default,
Duration(Duration),
}

#[async_trait::async_trait]
pub trait FileSinkWriteExt
where
Expand All @@ -22,14 +38,11 @@ where
const FILE_PREFIX: &'static str;
const METRIC_SUFFIX: &'static str;

// The `auto_commit` option and `roll_time` option are incompatible with
// each other. It doesn't make sense to roll a file every so often _and_
// commit it every time something is written. If a roll_time is provided,
// `auto_commit` is set to false.
async fn file_sink(
target_path: &Path,
file_upload: FileUpload,
roll_time: Option<Duration>,
commit_strategy: FileSinkCommitStrategy,
roll_time: FileSinkRollTime,
metric_prefix: &str,
) -> Result<(FileSinkClient<Self>, FileSink<Self>)> {
let builder = FileSinkBuilder::new(
Expand All @@ -39,10 +52,18 @@ where
format!("{}_{}", metric_prefix, Self::METRIC_SUFFIX),
);

let builder = if let Some(duration) = roll_time {
builder.auto_commit(false).roll_time(duration)
} else {
builder.auto_commit(true)
let builder = match commit_strategy {
FileSinkCommitStrategy::Manual => {
builder.auto_commit(false).roll_time(DEFAULT_ROLL_TIME)
}
FileSinkCommitStrategy::Automatic => {
builder.auto_commit(true).roll_time(DEFAULT_ROLL_TIME)
}
};

let builder = match roll_time {
FileSinkRollTime::Duration(duration) => builder.roll_time(duration),
FileSinkRollTime::Default => builder.roll_time(DEFAULT_ROLL_TIME),
};

let file_sink = builder.create().await?;
Expand Down
4 changes: 3 additions & 1 deletion file_store/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ mod msg_timestamp;
mod msg_verify;
mod report_id;

pub use file_sink_write::{FileSinkWriteExt, DEFAULT_ROLL_TIME};
pub use file_sink_write::{
FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, DEFAULT_ROLL_TIME,
};
pub use msg_bytes::MsgBytes;
pub use msg_decode::MsgDecode;
pub use msg_timestamp::{MsgTimestamp, TimestampDecode, TimestampEncode};
Expand Down
8 changes: 5 additions & 3 deletions ingest/src/server_iot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::Utc;
use file_store::{
file_sink::FileSinkClient,
file_upload,
traits::{FileSinkWriteExt, MsgVerify},
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, MsgVerify},
};
use futures::{
future::{LocalBoxFuture, TryFutureExt},
Expand Down Expand Up @@ -365,7 +365,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let (beacon_report_sink, beacon_report_sink_server) = LoraBeaconIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -374,7 +375,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let (witness_report_sink, witness_report_sink_server) = LoraWitnessIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
32 changes: 21 additions & 11 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::Utc;
use file_store::{
file_sink::FileSinkClient,
file_upload,
traits::{FileSinkWriteExt, MsgVerify},
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, MsgVerify},
};
use futures::future::LocalBoxFuture;
use futures_util::TryFutureExt;
Expand Down Expand Up @@ -450,7 +450,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
CellHeartbeatIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -459,7 +460,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
WifiHeartbeatIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -468,7 +470,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let (speedtest_report_sink, speedtest_report_sink_server) = SpeedtestIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -477,7 +480,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
DataTransferSessionIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -486,7 +490,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
SubscriberLocationIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -495,7 +500,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
RadioThresholdIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -504,7 +510,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
InvalidatedRadioThresholdIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -513,7 +520,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
CoverageObjectIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -522,7 +530,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
ServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -531,7 +540,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
SubscriberVerifiedMappingEventIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
8 changes: 5 additions & 3 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use file_store::{
file_sink::FileSinkClient,
file_source, file_upload,
iot_packet::PacketRouterPacketReport,
traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME},
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt},
FileStore, FileType,
};
use futures_util::TryFutureExt;
Expand Down Expand Up @@ -141,15 +141,17 @@ impl Cmd {
let (valid_packets, valid_packets_server) = ValidPacket::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
michaeldjeffrey marked this conversation as resolved.
Show resolved Hide resolved
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;

let (invalid_packets, invalid_packets_server) = InvalidPacket::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
26 changes: 17 additions & 9 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use file_store::{
file_info_poller::LookbackBehavior,
file_source, file_upload,
iot_packet::IotValidPacket,
traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME},
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt},
FileStore, FileType,
};
use helium_proto::{
Expand Down Expand Up @@ -124,7 +124,8 @@ impl Server {
let (rewards_sink, gateway_rewards_sink_server) = IotRewardShare::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -133,7 +134,8 @@ impl Server {
let (reward_manifests_sink, reward_manifests_sink_server) = RewardManifest::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down Expand Up @@ -177,7 +179,8 @@ impl Server {
NonRewardablePacket::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down Expand Up @@ -210,7 +213,8 @@ impl Server {
LoraInvalidBeaconReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -219,7 +223,8 @@ impl Server {
LoraInvalidWitnessReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -243,7 +248,8 @@ impl Server {
LoraInvalidBeaconReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -252,15 +258,17 @@ impl Server {
LoraInvalidWitnessReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;

let (runner_poc_sink, runner_poc_sink_server) = LoraPocV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(2 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(2 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
Loading