Skip to content

Commit

Permalink
Correct FileSink auto_commit and roll_time usage in Oracles
Browse files Browse the repository at this point in the history
The PR that updated FileSink to use the Ext trait had an incorrect
understanding of the relationship between `roll_time` and `auto_commit`.

That understanding has since been corrected and codified in the
`FileSinkCommitStrategy` enum.

For this commit, https://github.com/helium/oracles/pull/849/files was
combed through to get all correct settings for FileSinks before they
were transitioned to use FileSinkWriteExt::file_sink().
  • Loading branch information
michaeldjeffrey committed Aug 29, 2024
1 parent 0faed88 commit 07ee3d4
Show file tree
Hide file tree
Showing 17 changed files with 72 additions and 59 deletions.
9 changes: 6 additions & 3 deletions boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use boost_manager::{
};
use clap::Parser;
use file_store::{
file_info_poller::LookbackBehavior, file_source, file_upload, reward_manifest::RewardManifest,
traits::FileSinkWriteExt, FileStore, FileType,
file_info_poller::LookbackBehavior,
file_source, file_upload,
reward_manifest::RewardManifest,
traits::{FileSinkCommitStrategy, FileSinkWriteExt},
FileStore, FileType,
};
use helium_proto::BoostedHexUpdateV1;
use mobile_config::client::hex_boosting_client::HexBoostingClient;
Expand Down Expand Up @@ -103,7 +106,7 @@ impl Server {
let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
6 changes: 3 additions & 3 deletions ingest/src/server_iot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::Utc;
use file_store::{
file_sink::FileSinkClient,
file_upload,
traits::{FileSinkWriteExt, MsgVerify},
traits::{FileSinkCommitStrategy, FileSinkWriteExt, MsgVerify},
};
use futures::{
future::{LocalBoxFuture, TryFutureExt},
Expand Down Expand Up @@ -365,7 +365,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let (beacon_report_sink, beacon_report_sink_server) = LoraBeaconIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -374,7 +374,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let (witness_report_sink, witness_report_sink_server) = LoraWitnessIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
22 changes: 11 additions & 11 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::Utc;
use file_store::{
file_sink::FileSinkClient,
file_upload,
traits::{FileSinkWriteExt, MsgVerify},
traits::{FileSinkCommitStrategy, FileSinkWriteExt, MsgVerify},
};
use futures::future::LocalBoxFuture;
use futures_util::TryFutureExt;
Expand Down Expand Up @@ -450,7 +450,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
CellHeartbeatIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -459,7 +459,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
WifiHeartbeatIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -468,7 +468,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let (speedtest_report_sink, speedtest_report_sink_server) = SpeedtestIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -477,7 +477,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
DataTransferSessionIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -486,7 +486,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
SubscriberLocationIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -495,7 +495,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
RadioThresholdIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -504,7 +504,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
InvalidatedRadioThresholdIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -513,7 +513,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
CoverageObjectIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -522,7 +522,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
ServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -531,7 +531,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
SubscriberVerifiedMappingEventIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::AutomaticRollTime(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
6 changes: 3 additions & 3 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use file_store::{
file_sink::FileSinkClient,
file_source, file_upload,
iot_packet::PacketRouterPacketReport,
traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME},
traits::{FileSinkCommitStrategy, FileSinkWriteExt},
FileStore, FileType,
};
use futures_util::TryFutureExt;
Expand Down Expand Up @@ -141,15 +141,15 @@ impl Cmd {
let (valid_packets, valid_packets_server) = ValidPacket::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
env!("CARGO_PKG_NAME"),
)
.await?;

let (invalid_packets, invalid_packets_server) = InvalidPacket::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
18 changes: 9 additions & 9 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use file_store::{
file_info_poller::LookbackBehavior,
file_source, file_upload,
iot_packet::IotValidPacket,
traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME},
traits::{FileSinkCommitStrategy, FileSinkWriteExt},
FileStore, FileType,
};
use helium_proto::{
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Server {
let (rewards_sink, gateway_rewards_sink_server) = IotRewardShare::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -133,7 +133,7 @@ impl Server {
let (reward_manifests_sink, reward_manifests_sink_server) = RewardManifest::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down Expand Up @@ -177,7 +177,7 @@ impl Server {
NonRewardablePacket::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Server {
LoraInvalidBeaconReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -219,7 +219,7 @@ impl Server {
LoraInvalidWitnessReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -243,7 +243,7 @@ impl Server {
LoraInvalidBeaconReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -252,15 +252,15 @@ impl Server {
LoraInvalidWitnessReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;

let (runner_poc_sink, runner_poc_sink_server) = LoraPocV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(2 * 60)),
FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(2 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
8 changes: 4 additions & 4 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use file_store::{
file_sink::FileSinkClient,
file_source, file_upload,
mobile_session::DataTransferSessionIngestReport,
traits::FileSinkWriteExt,
traits::{FileSinkCommitStrategy, FileSinkWriteExt},
FileStore, FileType,
};

Expand Down Expand Up @@ -174,7 +174,7 @@ impl Cmd {
let (valid_sessions, valid_sessions_server) = ValidDataTransferSession::file_sink(
store_base_path,
file_upload.clone(),
None,
FileSinkCommitStrategy::Automatic,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -183,15 +183,15 @@ impl Cmd {
InvalidDataTransferIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
None,
FileSinkCommitStrategy::Manual,
env!("CARGO_PKG_NAME"),
)
.await?;

let (pending_sessions, pending_sessions_server) = PendingDataTransferSessionV1::file_sink(
store_base_path,
file_upload.clone(),
None,
FileSinkCommitStrategy::Manual,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
4 changes: 2 additions & 2 deletions mobile_verifier/src/boosting_oracles/data_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use chrono::{DateTime, Utc};
use file_store::{
file_sink::FileSinkClient,
file_upload::FileUpload,
traits::{FileSinkWriteExt, TimestampDecode, TimestampEncode},
traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampDecode, TimestampEncode},
FileStore,
};
use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt};
Expand Down Expand Up @@ -262,7 +262,7 @@ impl
OracleBoostingReportV1::file_sink(
settings.store_base_path(),
file_upload.clone(),
Some(Duration::from_secs(15 * 60)),
FileSinkCommitStrategy::AutomaticRollTime(Duration::from_secs(15 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
12 changes: 8 additions & 4 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ use crate::{
telemetry, Settings,
};
use anyhow::Result;
use file_store::{file_upload, traits::FileSinkWriteExt, FileStore};
use file_store::{
file_upload,
traits::{FileSinkCommitStrategy, FileSinkWriteExt},
FileStore,
};
use helium_proto::services::poc_mobile::{Heartbeat, SeniorityUpdate, SpeedtestAvg};
use mobile_config::client::{
entity_client::EntityClient, hex_boosting_client::HexBoostingClient, AuthorizationClient,
Expand Down Expand Up @@ -52,7 +56,7 @@ impl Cmd {
let (valid_heartbeats, valid_heartbeats_server) = Heartbeat::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(15 * 60)),
FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -61,15 +65,15 @@ impl Cmd {
let (seniority_updates, seniority_updates_server) = SeniorityUpdate::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(15 * 60)),
FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;

let (speedtests_avg, speedtests_avg_server) = SpeedtestAvg::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(15 * 60)),
FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
4 changes: 2 additions & 2 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use file_store::{
file_sink::FileSinkClient,
file_source,
file_upload::FileUpload,
traits::{FileSinkWriteExt, TimestampEncode},
traits::{FileSinkCommitStrategy, FileSinkWriteExt, TimestampEncode},
FileStore, FileType,
};
use futures::{
Expand Down Expand Up @@ -89,7 +89,7 @@ impl CoverageDaemon {
let (valid_coverage_objs, valid_coverage_objs_server) = proto::CoverageObjectV1::file_sink(
settings.store_base_path(),
file_upload.clone(),
Some(Duration::from_secs(15 * 60)),
FileSinkCommitStrategy::ManualRollTime(Duration::from_secs(15 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
6 changes: 3 additions & 3 deletions mobile_verifier/src/radio_threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use file_store::{
mobile_radio_threshold::{
RadioThresholdIngestReport, RadioThresholdReportReq, VerifiedRadioThresholdIngestReport,
},
traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME},
traits::{FileSinkCommitStrategy, FileSinkWriteExt},
FileStore, FileType,
};
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -73,7 +73,7 @@ where
VerifiedRadioThresholdIngestReportV1::file_sink(
settings.store_base_path(),
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -82,7 +82,7 @@ where
VerifiedInvalidatedRadioThresholdIngestReportV1::file_sink(
settings.store_base_path(),
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
Loading

0 comments on commit 07ee3d4

Please sign in to comment.