diff --git a/boost_manager/src/main.rs b/boost_manager/src/main.rs index f55e923ad..68a4a4e78 100644 --- a/boost_manager/src/main.rs +++ b/boost_manager/src/main.rs @@ -5,8 +5,11 @@ use boost_manager::{ }; use clap::Parser; use file_store::{ - file_info_poller::LookbackBehavior, file_source, file_upload, reward_manifest::RewardManifest, - traits::FileSinkWriteExt, FileStore, FileType, + file_info_poller::LookbackBehavior, + file_source, file_upload, + reward_manifest::RewardManifest, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + FileStore, FileType, }; use helium_proto::BoostedHexUpdateV1; use mobile_config::client::hex_boosting_client::HexBoostingClient; @@ -103,7 +106,7 @@ impl Server { let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/ingest/src/server_iot.rs b/ingest/src/server_iot.rs index f80034d69..481ab1fcf 100644 --- a/ingest/src/server_iot.rs +++ b/ingest/src/server_iot.rs @@ -4,7 +4,7 @@ use chrono::Utc; use file_store::{ file_sink::FileSinkClient, file_upload, - traits::{FileSinkWriteExt, MsgVerify}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt, MsgVerify}, }; use futures::{ future::{LocalBoxFuture, TryFutureExt}, @@ -365,7 +365,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (beacon_report_sink, beacon_report_sink_server) = LoraBeaconIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -374,7 +374,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (witness_report_sink, witness_report_sink_server) = LoraWitnessIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 636bd156d..46e873934 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -4,7 +4,7 @@ use chrono::Utc; use file_store::{ file_sink::FileSinkClient, file_upload, - traits::{FileSinkWriteExt, MsgVerify}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt, MsgVerify}, }; use futures::future::LocalBoxFuture; use futures_util::TryFutureExt; @@ -450,7 +450,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { CellHeartbeatIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -459,7 +459,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { WifiHeartbeatIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -468,7 +468,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (speedtest_report_sink, speedtest_report_sink_server) = SpeedtestIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -477,7 +477,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { DataTransferSessionIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -486,7 +486,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { SubscriberLocationIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -495,7 +495,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { RadioThresholdIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -504,7 +504,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { InvalidatedRadioThresholdIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -513,7 +513,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { CoverageObjectIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -522,7 +522,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -531,7 +531,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { SubscriberVerifiedMappingEventIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 77b1697dd..82afbde53 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -11,7 +11,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload, iot_packet::PacketRouterPacketReport, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use futures_util::TryFutureExt; @@ -141,7 +141,7 @@ impl Cmd { let (valid_packets, valid_packets_server) = ValidPacket::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -149,7 +149,7 @@ impl Cmd { let (invalid_packets, invalid_packets_server) = InvalidPacket::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 5afeb8866..65cc5927f 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -6,7 +6,7 @@ use file_store::{ file_info_poller::LookbackBehavior, file_source, file_upload, iot_packet::IotValidPacket, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use helium_proto::{ @@ -124,7 +124,7 @@ impl Server { let (rewards_sink, gateway_rewards_sink_server) = IotRewardShare::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -133,7 +133,7 @@ impl Server { let (reward_manifests_sink, reward_manifests_sink_server) = RewardManifest::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -177,7 +177,7 @@ impl Server { NonRewardablePacket::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -210,7 +210,7 @@ impl Server { LoraInvalidBeaconReportV1::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -219,7 +219,7 @@ impl Server { LoraInvalidWitnessReportV1::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -243,7 +243,7 @@ impl Server { LoraInvalidBeaconReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -252,7 +252,7 @@ impl Server { LoraInvalidWitnessReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -260,7 +260,7 @@ impl Server { let (runner_poc_sink, runner_poc_sink_server) = LoraPocV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(2 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(2 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 8e9e95ccc..0e640f5ab 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -8,7 +8,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload, mobile_session::DataTransferSessionIngestReport, - traits::FileSinkWriteExt, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; @@ -174,7 +174,7 @@ impl Cmd { let (valid_sessions, valid_sessions_server) = ValidDataTransferSession::file_sink( store_base_path, file_upload.clone(), - None, + FileSinkCommitStrategy::Automatic, env!("CARGO_PKG_NAME"), ) .await?; @@ -183,7 +183,7 @@ impl Cmd { InvalidDataTransferIngestReportV1::file_sink( store_base_path, file_upload.clone(), - None, + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -191,7 +191,7 @@ impl Cmd { let (pending_sessions, pending_sessions_server) = PendingDataTransferSessionV1::file_sink( store_base_path, file_upload.clone(), - None, + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index 32819fd66..9bb0ebbdb 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -9,7 +9,7 @@ use chrono::{DateTime, Utc}; use file_store::{ file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkWriteExt, TimestampDecode, TimestampEncode}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampDecode, TimestampEncode}, FileStore, }; use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; @@ -262,7 +262,7 @@ impl OracleBoostingReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 0ce51d47c..751a6627b 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -15,7 +15,11 @@ use crate::{ telemetry, Settings, }; use anyhow::Result; -use file_store::{file_upload, traits::FileSinkWriteExt, FileStore}; +use file_store::{ + file_upload, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, + FileStore, +}; use helium_proto::services::poc_mobile::{Heartbeat, SeniorityUpdate, SpeedtestAvg}; use mobile_config::client::{ entity_client::EntityClient, hex_boosting_client::HexBoostingClient, AuthorizationClient, @@ -52,7 +56,7 @@ impl Cmd { let (valid_heartbeats, valid_heartbeats_server) = Heartbeat::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -61,7 +65,7 @@ impl Cmd { let (seniority_updates, seniority_updates_server) = SeniorityUpdate::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -69,7 +73,7 @@ impl Cmd { let (speedtests_avg, speedtests_avg_server) = SpeedtestAvg::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index d10aebe7a..94ceb1b18 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -10,7 +10,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload::FileUpload, - traits::{FileSinkWriteExt, TimestampEncode}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampEncode}, FileStore, FileType, }; use futures::{ @@ -89,7 +89,7 @@ impl CoverageDaemon { let (valid_coverage_objs, valid_coverage_objs_server) = proto::CoverageObjectV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index c10453bff..6949bdec4 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -11,7 +11,7 @@ use file_store::{ mobile_radio_threshold::{ RadioThresholdIngestReport, RadioThresholdReportReq, VerifiedRadioThresholdIngestReport, }, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; @@ -73,7 +73,7 @@ where VerifiedRadioThresholdIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -82,7 +82,7 @@ where VerifiedInvalidatedRadioThresholdIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 8d5f81426..a2ed97d4a 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -17,7 +17,7 @@ use db_store::meta; use file_store::{ file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkWriteExt, TimestampEncode, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampEncode}, }; use futures_util::TryFutureExt; @@ -81,7 +81,7 @@ where let (mobile_rewards, mobile_rewards_server) = MobileRewardShare::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; @@ -89,7 +89,7 @@ where let (reward_manifests, reward_manifests_server) = RewardManifest::file_sink( settings.store_base_path(), file_upload, - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index 54433e117..02d3b84cd 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -7,7 +7,7 @@ use file_store::{ }, file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use futures::{prelude::future::LocalBoxFuture, StreamExt, TryFutureExt, TryStreamExt}; @@ -158,7 +158,7 @@ where VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink( settings.store_base_path(), file_upload, - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 97edf8e82..1150c57b7 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -9,7 +9,7 @@ use file_store::{ file_source, file_upload::FileUpload, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, - traits::FileSinkWriteExt, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use futures::{ @@ -77,7 +77,7 @@ where let (speedtests_validity, speedtests_validity_server) = VerifiedSpeedtestProto::file_sink( settings.store_base_path(), file_upload, - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 69a09a369..7e2786d9a 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -8,7 +8,7 @@ use file_store::{ SubscriberLocationIngestReport, SubscriberLocationReq, VerifiedSubscriberLocationIngestReport, }, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; @@ -57,7 +57,7 @@ where VerifiedSubscriberLocationIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/subscriber_verified_mapping_event.rs b/mobile_verifier/src/subscriber_verified_mapping_event.rs index 99ed76015..923cb4b73 100644 --- a/mobile_verifier/src/subscriber_verified_mapping_event.rs +++ b/mobile_verifier/src/subscriber_verified_mapping_event.rs @@ -7,7 +7,7 @@ use file_store::{ file_upload::FileUpload, subscriber_verified_mapping_event::SubscriberVerifiedMappingEvent, subscriber_verified_mapping_event_ingest_report::SubscriberVerifiedMappingEventIngestReport, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, verified_subscriber_verified_mapping_event_ingest_report::VerifiedSubscriberVerifiedMappingEventIngestReport, FileStore, FileType, }; @@ -78,7 +78,7 @@ where VerifiedSubscriberVerifiedMappingEventIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/poc_entropy/src/main.rs b/poc_entropy/src/main.rs index e13253438..f25d2c011 100644 --- a/poc_entropy/src/main.rs +++ b/poc_entropy/src/main.rs @@ -1,6 +1,9 @@ use anyhow::{Error, Result}; use clap::Parser; -use file_store::{file_upload, traits::FileSinkWriteExt}; +use file_store::{ + file_upload, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, +}; use futures_util::TryFutureExt; use helium_proto::EntropyReportV1; use poc_entropy::{entropy_generator::EntropyGenerator, server::ApiServer, Settings}; @@ -74,7 +77,7 @@ impl Server { let (entropy_sink, entropy_sink_server) = EntropyReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/price/src/main.rs b/price/src/main.rs index 4695a9005..c1eadc2ad 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -1,6 +1,9 @@ use anyhow::Result; use clap::Parser; -use file_store::{file_upload, traits::FileSinkWriteExt}; +use file_store::{ + file_upload, + traits::{FileSinkCommitStrategy, FileSinkWriteExt}, +}; use helium_proto::PriceReportV1; use price::{cli::check, PriceGenerator, Settings}; use std::{ @@ -85,7 +88,7 @@ impl Server { let (price_sink, price_sink_server) = PriceReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(PRICE_SINK_ROLL_SECS)), + FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(PRICE_SINK_ROLL_SECS)), env!("CARGO_PKG_NAME"), ) .await?;