From 0faed884e57c40b6bc19bd04355fecfc725fc60f Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 29 Aug 2024 14:22:45 -0700 Subject: [PATCH 1/3] Add FileSinkCommitStrategy This hopefully clears up, at least for the use of the FileSinkWriteExt trait the relationship between `auto_commit` and `roll_time`. There is no default implementaion for this option, as all options have implications that should be considered. --- file_store/src/file_sink.rs | 7 +++++ file_store/src/traits/file_sink_write.rs | 39 ++++++++++++++++++------ file_store/src/traits/mod.rs | 2 +- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 81548daa1..771754e90 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -253,12 +253,19 @@ pub struct FileSink { target_path: PathBuf, tmp_path: PathBuf, prefix: String, + /// Maximum file size in bytes. If a single write would cause this limit to + /// be exceeded, the `active_sink` is rolled. max_size: usize, + /// Window within which writes can occur to `active_sink`. `roll_time` is + /// not checked during writing, so a file may contain items exceeding the + /// window of `roll_time`. roll_time: Duration, messages: MessageReceiver, file_upload: FileUpload, staged_files: Vec, + /// 'commit' the file to s3 automatically when either the `roll_time` is + /// surpassed, or `max_size` would be exceeded by an incoming message. auto_commit: bool, active_sink: Option, diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index 6aa8ea861..e80b80c64 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -14,6 +14,22 @@ use helium_proto::{ pub const DEFAULT_ROLL_TIME: Duration = Duration::from_secs(DEFAULT_SINK_ROLL_SECS); +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum FileSinkCommitStrategy { + /// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded. + /// Files will be collected into tmp storage on `DEFAULT_ROLL_TIME` basis. + Manual, + /// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded. + /// Files will be collected into tmp storage on provided `roll_time` basis. + ManualRollTime(Duration), + /// Files will be automatically uploaded when + /// [`FileSinkBuilder::max_size()`] is exceeded, or [`DEFAULT_ROLL_TIME`] has elapsed. + Automatic, + /// Files will be automatically uploaded when + /// [`FileSinkBuilder::max_size()`] is exceeded, or provided `roll_time` has elapsed. + AutomaticRollTime(Duration), +} + #[async_trait::async_trait] pub trait FileSinkWriteExt where @@ -22,14 +38,10 @@ where const FILE_PREFIX: &'static str; const METRIC_SUFFIX: &'static str; - // The `auto_commit` option and `roll_time` option are incompatible with - // each other. It doesn't make sense to roll a file every so often _and_ - // commit it every time something is written. If a roll_time is provided, - // `auto_commit` is set to false. async fn file_sink( target_path: &Path, file_upload: FileUpload, - roll_time: Option, + commit_strategy: FileSinkCommitStrategy, metric_prefix: &str, ) -> Result<(FileSinkClient, FileSink)> { let builder = FileSinkBuilder::new( @@ -39,10 +51,19 @@ where format!("{}_{}", metric_prefix, Self::METRIC_SUFFIX), ); - let builder = if let Some(duration) = roll_time { - builder.auto_commit(false).roll_time(duration) - } else { - builder.auto_commit(true) + let builder = match commit_strategy { + FileSinkCommitStrategy::Manual => { + builder.auto_commit(false).roll_time(DEFAULT_ROLL_TIME) + } + FileSinkCommitStrategy::ManualRollTime(roll_time) => { + builder.auto_commit(false).roll_time(roll_time) + } + FileSinkCommitStrategy::Automatic => { + builder.auto_commit(true).roll_time(DEFAULT_ROLL_TIME) + } + FileSinkCommitStrategy::AutomaticRollTime(roll_time) => { + builder.auto_commit(true).roll_time(roll_time) + } }; let file_sink = builder.create().await?; diff --git a/file_store/src/traits/mod.rs b/file_store/src/traits/mod.rs index ae37821c8..976b8cec5 100644 --- a/file_store/src/traits/mod.rs +++ b/file_store/src/traits/mod.rs @@ -5,7 +5,7 @@ mod msg_timestamp; mod msg_verify; mod report_id; -pub use file_sink_write::{FileSinkWriteExt, DEFAULT_ROLL_TIME}; +pub use file_sink_write::{FileSinkCommitStrategy, FileSinkWriteExt, DEFAULT_ROLL_TIME}; pub use msg_bytes::MsgBytes; pub use msg_decode::MsgDecode; pub use msg_timestamp::{MsgTimestamp, TimestampDecode, TimestampEncode}; From 07ee3d49083617cad48b5f60c24ef4fab4e929aa Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Thu, 29 Aug 2024 14:24:23 -0700 Subject: [PATCH 2/3] Correct FileSink `auto_commit` and `roll_time` usage in Oracles The PR that updated FileSink to use the Ext trait had an incorrect understanding of the relationship between `roll_time` and `auto_commit`. That understanding has since been corrected and codified in the `FileSinkCommitStrategy` enum. For this commit, https://github.com/helium/oracles/pull/849/files was combed through to get all correct settings for FileSinks before they were transitioned to use FileSinkWriteExt::file_sink(). --- boost_manager/src/main.rs | 9 +++++--- ingest/src/server_iot.rs | 6 ++--- ingest/src/server_mobile.rs | 22 +++++++++---------- iot_packet_verifier/src/daemon.rs | 6 ++--- iot_verifier/src/main.rs | 18 +++++++-------- mobile_packet_verifier/src/daemon.rs | 8 +++---- .../src/boosting_oracles/data_sets.rs | 4 ++-- mobile_verifier/src/cli/server.rs | 12 ++++++---- mobile_verifier/src/coverage.rs | 4 ++-- mobile_verifier/src/radio_threshold.rs | 6 ++--- mobile_verifier/src/rewarder.rs | 6 ++--- .../src/sp_boosted_rewards_bans.rs | 4 ++-- mobile_verifier/src/speedtests.rs | 4 ++-- mobile_verifier/src/subscriber_location.rs | 4 ++-- .../src/subscriber_verified_mapping_event.rs | 4 ++-- poc_entropy/src/main.rs | 7 ++++-- price/src/main.rs | 7 ++++-- 17 files changed, 72 insertions(+), 59 deletions(-) diff --git a/boost_manager/src/main.rs b/boost_manager/src/main.rs index f55e923ad..68a4a4e78 100644 --- a/boost_manager/src/main.rs +++ b/boost_manager/src/main.rs @@ -5,8 +5,11 @@ use boost_manager::{ }; use clap::Parser; use file_store::{ - file_info_poller::LookbackBehavior, file_source, file_upload, reward_manifest::RewardManifest, - traits::FileSinkWriteExt, FileStore, FileType, + file_info_poller::LookbackBehavior, + file_source, file_upload, + reward_manifest::RewardManifest, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + FileStore, FileType, }; use helium_proto::BoostedHexUpdateV1; use mobile_config::client::hex_boosting_client::HexBoostingClient; @@ -103,7 +106,7 @@ impl Server { let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/ingest/src/server_iot.rs b/ingest/src/server_iot.rs index f80034d69..481ab1fcf 100644 --- a/ingest/src/server_iot.rs +++ b/ingest/src/server_iot.rs @@ -4,7 +4,7 @@ use chrono::Utc; use file_store::{ file_sink::FileSinkClient, file_upload, - traits::{FileSinkWriteExt, MsgVerify}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt, MsgVerify}, }; use futures::{ future::{LocalBoxFuture, TryFutureExt}, @@ -365,7 +365,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (beacon_report_sink, beacon_report_sink_server) = LoraBeaconIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -374,7 +374,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (witness_report_sink, witness_report_sink_server) = LoraWitnessIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 636bd156d..46e873934 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -4,7 +4,7 @@ use chrono::Utc; use file_store::{ file_sink::FileSinkClient, file_upload, - traits::{FileSinkWriteExt, MsgVerify}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt, MsgVerify}, }; use futures::future::LocalBoxFuture; use futures_util::TryFutureExt; @@ -450,7 +450,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { CellHeartbeatIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -459,7 +459,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { WifiHeartbeatIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -468,7 +468,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (speedtest_report_sink, speedtest_report_sink_server) = SpeedtestIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -477,7 +477,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { DataTransferSessionIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -486,7 +486,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { SubscriberLocationIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -495,7 +495,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { RadioThresholdIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -504,7 +504,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { InvalidatedRadioThresholdIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -513,7 +513,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { CoverageObjectIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -522,7 +522,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -531,7 +531,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { SubscriberVerifiedMappingEventIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 77b1697dd..82afbde53 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -11,7 +11,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload, iot_packet::PacketRouterPacketReport, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use futures_util::TryFutureExt; @@ -141,7 +141,7 @@ impl Cmd { let (valid_packets, valid_packets_server) = ValidPacket::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -149,7 +149,7 @@ impl Cmd { let (invalid_packets, invalid_packets_server) = InvalidPacket::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 5afeb8866..65cc5927f 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -6,7 +6,7 @@ use file_store::{ file_info_poller::LookbackBehavior, file_source, file_upload, iot_packet::IotValidPacket, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use helium_proto::{ @@ -124,7 +124,7 @@ impl Server { let (rewards_sink, gateway_rewards_sink_server) = IotRewardShare::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -133,7 +133,7 @@ impl Server { let (reward_manifests_sink, reward_manifests_sink_server) = RewardManifest::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -177,7 +177,7 @@ impl Server { NonRewardablePacket::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -210,7 +210,7 @@ impl Server { LoraInvalidBeaconReportV1::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -219,7 +219,7 @@ impl Server { LoraInvalidWitnessReportV1::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -243,7 +243,7 @@ impl Server { LoraInvalidBeaconReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -252,7 +252,7 @@ impl Server { LoraInvalidWitnessReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -260,7 +260,7 @@ impl Server { let (runner_poc_sink, runner_poc_sink_server) = LoraPocV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(2 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(2 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 8e9e95ccc..0e640f5ab 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -8,7 +8,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload, mobile_session::DataTransferSessionIngestReport, - traits::FileSinkWriteExt, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; @@ -174,7 +174,7 @@ impl Cmd { let (valid_sessions, valid_sessions_server) = ValidDataTransferSession::file_sink( store_base_path, file_upload.clone(), - None, + FileSinkCommitStrategy::Automatic, env!("CARGO_PKG_NAME"), ) .await?; @@ -183,7 +183,7 @@ impl Cmd { InvalidDataTransferIngestReportV1::file_sink( store_base_path, file_upload.clone(), - None, + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -191,7 +191,7 @@ impl Cmd { let (pending_sessions, pending_sessions_server) = PendingDataTransferSessionV1::file_sink( store_base_path, file_upload.clone(), - None, + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index 32819fd66..9bb0ebbdb 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -9,7 +9,7 @@ use chrono::{DateTime, Utc}; use file_store::{ file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkWriteExt, TimestampDecode, TimestampEncode}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampDecode, TimestampEncode}, FileStore, }; use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; @@ -262,7 +262,7 @@ impl OracleBoostingReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 0ce51d47c..751a6627b 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -15,7 +15,11 @@ use crate::{ telemetry, Settings, }; use anyhow::Result; -use file_store::{file_upload, traits::FileSinkWriteExt, FileStore}; +use file_store::{ + file_upload, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + FileStore, +}; use helium_proto::services::poc_mobile::{Heartbeat, SeniorityUpdate, SpeedtestAvg}; use mobile_config::client::{ entity_client::EntityClient, hex_boosting_client::HexBoostingClient, AuthorizationClient, @@ -52,7 +56,7 @@ impl Cmd { let (valid_heartbeats, valid_heartbeats_server) = Heartbeat::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -61,7 +65,7 @@ impl Cmd { let (seniority_updates, seniority_updates_server) = SeniorityUpdate::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -69,7 +73,7 @@ impl Cmd { let (speedtests_avg, speedtests_avg_server) = SpeedtestAvg::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index d10aebe7a..94ceb1b18 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -10,7 +10,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload::FileUpload, - traits::{FileSinkWriteExt, TimestampEncode}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampEncode}, FileStore, FileType, }; use futures::{ @@ -89,7 +89,7 @@ impl CoverageDaemon { let (valid_coverage_objs, valid_coverage_objs_server) = proto::CoverageObjectV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index c10453bff..6949bdec4 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -11,7 +11,7 @@ use file_store::{ mobile_radio_threshold::{ RadioThresholdIngestReport, RadioThresholdReportReq, VerifiedRadioThresholdIngestReport, }, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; @@ -73,7 +73,7 @@ where VerifiedRadioThresholdIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -82,7 +82,7 @@ where VerifiedInvalidatedRadioThresholdIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 8d5f81426..a2ed97d4a 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -17,7 +17,7 @@ use db_store::meta; use file_store::{ file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkWriteExt, TimestampEncode, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampEncode}, }; use futures_util::TryFutureExt; @@ -81,7 +81,7 @@ where let (mobile_rewards, mobile_rewards_server) = MobileRewardShare::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -89,7 +89,7 @@ where let (reward_manifests, reward_manifests_server) = RewardManifest::file_sink( settings.store_base_path(), file_upload, - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index 54433e117..02d3b84cd 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -7,7 +7,7 @@ use file_store::{ }, file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use futures::{prelude::future::LocalBoxFuture, StreamExt, TryFutureExt, TryStreamExt}; @@ -158,7 +158,7 @@ where VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink( settings.store_base_path(), file_upload, - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 97edf8e82..1150c57b7 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -9,7 +9,7 @@ use file_store::{ file_source, file_upload::FileUpload, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, - traits::FileSinkWriteExt, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use futures::{ @@ -77,7 +77,7 @@ where let (speedtests_validity, speedtests_validity_server) = VerifiedSpeedtestProto::file_sink( settings.store_base_path(), file_upload, - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 69a09a369..7e2786d9a 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -8,7 +8,7 @@ use file_store::{ SubscriberLocationIngestReport, SubscriberLocationReq, VerifiedSubscriberLocationIngestReport, }, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; @@ -57,7 +57,7 @@ where VerifiedSubscriberLocationIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/subscriber_verified_mapping_event.rs b/mobile_verifier/src/subscriber_verified_mapping_event.rs index 99ed76015..923cb4b73 100644 --- a/mobile_verifier/src/subscriber_verified_mapping_event.rs +++ b/mobile_verifier/src/subscriber_verified_mapping_event.rs @@ -7,7 +7,7 @@ use file_store::{ file_upload::FileUpload, subscriber_verified_mapping_event::SubscriberVerifiedMappingEvent, subscriber_verified_mapping_event_ingest_report::SubscriberVerifiedMappingEventIngestReport, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, verified_subscriber_verified_mapping_event_ingest_report::VerifiedSubscriberVerifiedMappingEventIngestReport, FileStore, FileType, }; @@ -78,7 +78,7 @@ where VerifiedSubscriberVerifiedMappingEventIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/poc_entropy/src/main.rs b/poc_entropy/src/main.rs index e13253438..f25d2c011 100644 --- a/poc_entropy/src/main.rs +++ b/poc_entropy/src/main.rs @@ -1,6 +1,9 @@ use anyhow::{Error, Result}; use clap::Parser; -use file_store::{file_upload, traits::FileSinkWriteExt}; +use file_store::{ + file_upload, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, +}; use futures_util::TryFutureExt; use helium_proto::EntropyReportV1; use poc_entropy::{entropy_generator::EntropyGenerator, server::ApiServer, Settings}; @@ -74,7 +77,7 @@ impl Server { let (entropy_sink, entropy_sink_server) = EntropyReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/price/src/main.rs b/price/src/main.rs index 4695a9005..c1eadc2ad 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -1,6 +1,9 @@ use anyhow::Result; use clap::Parser; -use file_store::{file_upload, traits::FileSinkWriteExt}; +use file_store::{ + file_upload, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, +}; use helium_proto::PriceReportV1; use price::{cli::check, PriceGenerator, Settings}; use std::{ @@ -85,7 +88,7 @@ impl Server { let (price_sink, price_sink_server) = PriceReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(PRICE_SINK_ROLL_SECS)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(PRICE_SINK_ROLL_SECS)), env!("CARGO_PKG_NAME"), ) .await?; From dfa99d40687ba214d66f5cee22a70e2fe9766169 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Fri, 30 Aug 2024 12:13:14 -0400 Subject: [PATCH 3/3] Change so auto_commit and roll_time are separated (#860) * Change so auto_commit and roll_time are separated * Create FileSinkRollTime enum --- boost_manager/src/main.rs | 5 +-- file_store/src/traits/file_sink_write.rs | 26 +++++++-------- file_store/src/traits/mod.rs | 4 ++- ingest/src/server_iot.rs | 8 +++-- ingest/src/server_mobile.rs | 32 ++++++++++++------- iot_packet_verifier/src/daemon.rs | 4 ++- iot_verifier/src/main.rs | 18 ++++++++--- mobile_packet_verifier/src/daemon.rs | 5 ++- .../src/boosting_oracles/data_sets.rs | 8 +++-- mobile_verifier/src/cli/server.rs | 11 ++++--- mobile_verifier/src/coverage.rs | 5 +-- mobile_verifier/src/radio_threshold.rs | 4 ++- mobile_verifier/src/rewarder.rs | 4 ++- .../src/sp_boosted_rewards_bans.rs | 3 +- mobile_verifier/src/speedtests.rs | 5 +-- mobile_verifier/src/subscriber_location.rs | 3 +- .../src/subscriber_verified_mapping_event.rs | 3 +- poc_entropy/src/main.rs | 5 +-- price/src/main.rs | 5 +-- 19 files changed, 102 insertions(+), 56 deletions(-) diff --git a/boost_manager/src/main.rs b/boost_manager/src/main.rs index 68a4a4e78..40c8b80fa 100644 --- a/boost_manager/src/main.rs +++ b/boost_manager/src/main.rs @@ -8,7 +8,7 @@ use file_store::{ file_info_poller::LookbackBehavior, file_source, file_upload, reward_manifest::RewardManifest, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use helium_proto::BoostedHexUpdateV1; @@ -106,7 +106,8 @@ impl Server { let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index e80b80c64..fae6108d1 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -17,17 +17,17 @@ pub const DEFAULT_ROLL_TIME: Duration = Duration::from_secs(DEFAULT_SINK_ROLL_SE #[derive(Copy, Clone, PartialEq, Eq)] pub enum FileSinkCommitStrategy { /// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded. - /// Files will be collected into tmp storage on `DEFAULT_ROLL_TIME` basis. Manual, - /// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded. - /// Files will be collected into tmp storage on provided `roll_time` basis. - ManualRollTime(Duration), /// Files will be automatically uploaded when /// [`FileSinkBuilder::max_size()`] is exceeded, or [`DEFAULT_ROLL_TIME`] has elapsed. Automatic, - /// Files will be automatically uploaded when - /// [`FileSinkBuilder::max_size()`] is exceeded, or provided `roll_time` has elapsed. - AutomaticRollTime(Duration), +} + +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum FileSinkRollTime { + /// Default is 3 minutes + Default, + Duration(Duration), } #[async_trait::async_trait] @@ -42,6 +42,7 @@ where target_path: &Path, file_upload: FileUpload, commit_strategy: FileSinkCommitStrategy, + roll_time: FileSinkRollTime, metric_prefix: &str, ) -> Result<(FileSinkClient, FileSink)> { let builder = FileSinkBuilder::new( @@ -55,15 +56,14 @@ where FileSinkCommitStrategy::Manual => { builder.auto_commit(false).roll_time(DEFAULT_ROLL_TIME) } - FileSinkCommitStrategy::ManualRollTime(roll_time) => { - builder.auto_commit(false).roll_time(roll_time) - } FileSinkCommitStrategy::Automatic => { builder.auto_commit(true).roll_time(DEFAULT_ROLL_TIME) } - FileSinkCommitStrategy::AutomaticRollTime(roll_time) => { - builder.auto_commit(true).roll_time(roll_time) - } + }; + + let builder = match roll_time { + FileSinkRollTime::Duration(duration) => builder.roll_time(duration), + FileSinkRollTime::Default => builder.roll_time(DEFAULT_ROLL_TIME), }; let file_sink = builder.create().await?; diff --git a/file_store/src/traits/mod.rs b/file_store/src/traits/mod.rs index 976b8cec5..3a55973b0 100644 --- a/file_store/src/traits/mod.rs +++ b/file_store/src/traits/mod.rs @@ -5,7 +5,9 @@ mod msg_timestamp; mod msg_verify; mod report_id; -pub use file_sink_write::{FileSinkCommitStrategy, FileSinkWriteExt, DEFAULT_ROLL_TIME}; +pub use file_sink_write::{ + FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, DEFAULT_ROLL_TIME, +}; pub use msg_bytes::MsgBytes; pub use msg_decode::MsgDecode; pub use msg_timestamp::{MsgTimestamp, TimestampDecode, TimestampEncode}; diff --git a/ingest/src/server_iot.rs b/ingest/src/server_iot.rs index 481ab1fcf..eb82d73a1 100644 --- a/ingest/src/server_iot.rs +++ b/ingest/src/server_iot.rs @@ -4,7 +4,7 @@ use chrono::Utc; use file_store::{ file_sink::FileSinkClient, file_upload, - traits::{FileSinkCommitStrategy, FileSinkWriteExt, MsgVerify}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, MsgVerify}, }; use futures::{ future::{LocalBoxFuture, TryFutureExt}, @@ -365,7 +365,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (beacon_report_sink, beacon_report_sink_server) = LoraBeaconIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -374,7 +375,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (witness_report_sink, witness_report_sink_server) = LoraWitnessIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 46e873934..c1e91ded1 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -4,7 +4,7 @@ use chrono::Utc; use file_store::{ file_sink::FileSinkClient, file_upload, - traits::{FileSinkCommitStrategy, FileSinkWriteExt, MsgVerify}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, MsgVerify}, }; use futures::future::LocalBoxFuture; use futures_util::TryFutureExt; @@ -450,7 +450,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { CellHeartbeatIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -459,7 +460,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { WifiHeartbeatIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -468,7 +470,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (speedtest_report_sink, speedtest_report_sink_server) = SpeedtestIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -477,7 +480,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { DataTransferSessionIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -486,7 +490,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { SubscriberLocationIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -495,7 +500,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { RadioThresholdIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -504,7 +510,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { InvalidatedRadioThresholdIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -513,7 +520,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { CoverageObjectIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -522,7 +530,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -531,7 +540,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { SubscriberVerifiedMappingEventIngestReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 82afbde53..5b4b21710 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -11,7 +11,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload, iot_packet::PacketRouterPacketReport, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use futures_util::TryFutureExt; @@ -142,6 +142,7 @@ impl Cmd { store_base_path, file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -150,6 +151,7 @@ impl Cmd { store_base_path, file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 65cc5927f..a7e0192f0 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -6,7 +6,7 @@ use file_store::{ file_info_poller::LookbackBehavior, file_source, file_upload, iot_packet::IotValidPacket, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use helium_proto::{ @@ -125,6 +125,7 @@ impl Server { store_base_path, file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -134,6 +135,7 @@ impl Server { store_base_path, file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -177,7 +179,8 @@ impl Server { NonRewardablePacket::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -211,6 +214,7 @@ impl Server { store_base_path, file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -220,6 +224,7 @@ impl Server { store_base_path, file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -243,7 +248,8 @@ impl Server { LoraInvalidBeaconReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -252,7 +258,8 @@ impl Server { LoraInvalidWitnessReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -260,7 +267,8 @@ impl Server { let (runner_poc_sink, runner_poc_sink_server) = LoraPocV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(2 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(2 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 0e640f5ab..86d639a54 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -8,7 +8,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload, mobile_session::DataTransferSessionIngestReport, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; @@ -175,6 +175,7 @@ impl Cmd { store_base_path, file_upload.clone(), FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -184,6 +185,7 @@ impl Cmd { store_base_path, file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -192,6 +194,7 @@ impl Cmd { store_base_path, file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index 9bb0ebbdb..30da679b8 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -9,7 +9,10 @@ use chrono::{DateTime, Utc}; use file_store::{ file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampDecode, TimestampEncode}, + traits::{ + FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampDecode, + TimestampEncode, + }, FileStore, }; use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; @@ -262,7 +265,8 @@ impl OracleBoostingReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 751a6627b..193455c5c 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -17,7 +17,7 @@ use crate::{ use anyhow::Result; use file_store::{ file_upload, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, }; use helium_proto::services::poc_mobile::{Heartbeat, SeniorityUpdate, SpeedtestAvg}; @@ -56,7 +56,8 @@ impl Cmd { let (valid_heartbeats, valid_heartbeats_server) = Heartbeat::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -65,7 +66,8 @@ impl Cmd { let (seniority_updates, seniority_updates_server) = SeniorityUpdate::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -73,7 +75,8 @@ impl Cmd { let (speedtests_avg, speedtests_avg_server) = SpeedtestAvg::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 94ceb1b18..e7178e177 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -10,7 +10,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload::FileUpload, - traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampEncode}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode}, FileStore, FileType, }; use futures::{ @@ -89,7 +89,8 @@ impl CoverageDaemon { let (valid_coverage_objs, valid_coverage_objs_server) = proto::CoverageObjectV1::file_sink( settings.store_base_path(), file_upload.clone(), - FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index 6949bdec4..158c31dd9 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -11,7 +11,7 @@ use file_store::{ mobile_radio_threshold::{ RadioThresholdIngestReport, RadioThresholdReportReq, VerifiedRadioThresholdIngestReport, }, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; @@ -74,6 +74,7 @@ where settings.store_base_path(), file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -83,6 +84,7 @@ where settings.store_base_path(), file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index a2ed97d4a..2befe03ac 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -17,7 +17,7 @@ use db_store::meta; use file_store::{ file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampEncode}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode}, }; use futures_util::TryFutureExt; @@ -82,6 +82,7 @@ where settings.store_base_path(), file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -90,6 +91,7 @@ where settings.store_base_path(), file_upload, FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index 02d3b84cd..c4bafc051 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -7,7 +7,7 @@ use file_store::{ }, file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use futures::{prelude::future::LocalBoxFuture, StreamExt, TryFutureExt, TryStreamExt}; @@ -159,6 +159,7 @@ where settings.store_base_path(), file_upload, FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 1150c57b7..07a116dd3 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -9,7 +9,7 @@ use file_store::{ file_source, file_upload::FileUpload, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use futures::{ @@ -77,7 +77,8 @@ where let (speedtests_validity, speedtests_validity_server) = VerifiedSpeedtestProto::file_sink( settings.store_base_path(), file_upload, - FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 7e2786d9a..d7446a2df 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -8,7 +8,7 @@ use file_store::{ SubscriberLocationIngestReport, SubscriberLocationReq, VerifiedSubscriberLocationIngestReport, }, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; @@ -58,6 +58,7 @@ where settings.store_base_path(), file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/subscriber_verified_mapping_event.rs b/mobile_verifier/src/subscriber_verified_mapping_event.rs index 923cb4b73..913ba9c51 100644 --- a/mobile_verifier/src/subscriber_verified_mapping_event.rs +++ b/mobile_verifier/src/subscriber_verified_mapping_event.rs @@ -7,7 +7,7 @@ use file_store::{ file_upload::FileUpload, subscriber_verified_mapping_event::SubscriberVerifiedMappingEvent, subscriber_verified_mapping_event_ingest_report::SubscriberVerifiedMappingEventIngestReport, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, verified_subscriber_verified_mapping_event_ingest_report::VerifiedSubscriberVerifiedMappingEventIngestReport, FileStore, FileType, }; @@ -79,6 +79,7 @@ where settings.store_base_path(), file_upload.clone(), FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/poc_entropy/src/main.rs b/poc_entropy/src/main.rs index f25d2c011..ba0c5f171 100644 --- a/poc_entropy/src/main.rs +++ b/poc_entropy/src/main.rs @@ -2,7 +2,7 @@ use anyhow::{Error, Result}; use clap::Parser; use file_store::{ file_upload, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, }; use futures_util::TryFutureExt; use helium_proto::EntropyReportV1; @@ -77,7 +77,8 @@ impl Server { let (entropy_sink, entropy_sink_server) = EntropyReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/price/src/main.rs b/price/src/main.rs index c1eadc2ad..a1cf2c389 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -2,7 +2,7 @@ use anyhow::Result; use clap::Parser; use file_store::{ file_upload, - traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, }; use helium_proto::PriceReportV1; use price::{cli::check, PriceGenerator, Settings}; @@ -88,7 +88,8 @@ impl Server { let (price_sink, price_sink_server) = PriceReportV1::file_sink( store_base_path, file_upload.clone(), - FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(PRICE_SINK_ROLL_SECS)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(PRICE_SINK_ROLL_SECS)), env!("CARGO_PKG_NAME"), ) .await?;