From 815ac8adb2b54e848b23e3400739192f38dd63de Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 6 Jun 2024 12:04:58 -0400 Subject: [PATCH 1/2] Abstract data set processing to a trait --- .../src/boosting_oracles/data_sets.rs | 104 ++++++++++++++---- 1 file changed, 83 insertions(+), 21 deletions(-) diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index ca60b2358..39aad1bf2 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -173,11 +173,11 @@ where h.urbanization.is_ready() && h.footfall.is_ready() && h.landtype.is_ready() } -pub struct DataSetDownloaderDaemon { +pub struct DataSetDownloaderDaemon { pool: PgPool, data_sets: HexBoostData, store: FileStore, - oracle_boosting_sink: FileSinkClient, + data_set_processor: T, data_set_directory: PathBuf, new_coverage_object_notification: NewCoverageObjectNotification, poll_duration: Duration, @@ -217,11 +217,12 @@ impl DataSetStatus { } } -impl ManagedTask for DataSetDownloaderDaemon +impl ManagedTask for DataSetDownloaderDaemon where A: DataSet, B: DataSet, C: DataSet, + T: DataSetProcessor, { fn start_task( self: Box, @@ -243,7 +244,7 @@ where } } -impl DataSetDownloaderDaemon { +impl DataSetDownloaderDaemon { pub async fn create_managed_task( pool: PgPool, settings: &Settings, @@ -288,17 +289,18 @@ impl DataSetDownloaderDaemon { } } -impl DataSetDownloaderDaemon +impl DataSetDownloaderDaemon where A: DataSet, B: DataSet, C: DataSet, + T: DataSetProcessor, { pub fn new( pool: PgPool, data_sets: HexBoostData, store: FileStore, - oracle_boosting_sink: FileSinkClient, + data_set_processor: T, data_set_directory: PathBuf, new_coverage_object_notification: NewCoverageObjectNotification, poll_duration: Duration, @@ -307,7 +309,7 @@ where pool, data_sets, store, - oracle_boosting_sink, + data_set_processor, data_set_directory, new_coverage_object_notification, poll_duration, @@ -337,12 +339,9 @@ where new_urbanized.is_some() || new_footfall.is_some() || new_landtype.is_some(); if is_hex_boost_data_ready(&self.data_sets) && new_data_set { tracing::info!("Processing new data sets"); - set_all_oracle_boosting_assignments( - &self.pool, - &self.data_sets, - &self.oracle_boosting_sink, - ) - .await?; + self.data_set_processor + .set_all_oracle_boosting_assignments(&self.pool, &self.data_sets) + .await?; } // Mark the new data sets as processed and delete the old ones @@ -394,12 +393,9 @@ where // Attempt to fill in any unassigned hexes. This is for the edge case in // which we shutdown before a coverage object updates. if is_hex_boost_data_ready(&self.data_sets) { - set_unassigned_oracle_boosting_assignments( - &self.pool, - &self.data_sets, - &self.oracle_boosting_sink, - ) - .await?; + self.data_set_processor + .set_unassigned_oracle_boosting_assignments(&self.pool, &self.data_sets) + .await?; } let mut wakeup = Instant::now() + self.poll_duration; @@ -410,10 +406,9 @@ where // If we see a new coverage object, we want to assign only those hexes // that don't have an assignment if is_hex_boost_data_ready(&self.data_sets) { - set_unassigned_oracle_boosting_assignments( + self.data_set_processor.set_unassigned_oracle_boosting_assignments( &self.pool, &self.data_sets, - &self.oracle_boosting_sink ).await?; } }, @@ -507,6 +502,73 @@ impl DataSetType { } } +#[async_trait::async_trait] +pub trait DataSetProcessor: Send + Sync + 'static { + async fn set_all_oracle_boosting_assignments( + &self, + pool: &PgPool, + data_sets: &HexBoostData, + ) -> anyhow::Result<()>; + + async fn set_unassigned_oracle_boosting_assignments( + &self, + pool: &PgPool, + data_sets: &HexBoostData, + ) -> anyhow::Result<()>; +} + +#[async_trait::async_trait] +impl DataSetProcessor for FileSinkClient { + async fn set_all_oracle_boosting_assignments( + &self, + pool: &PgPool, + data_sets: &HexBoostData, + ) -> anyhow::Result<()> { + let assigned_coverage_objs = + AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(pool), data_sets) + .await?; + assigned_coverage_objs.write(self).await?; + assigned_coverage_objs.save(pool).await?; + Ok(()) + } + + async fn set_unassigned_oracle_boosting_assignments( + &self, + pool: &PgPool, + data_sets: &HexBoostData, + ) -> anyhow::Result<()> { + let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream( + db::fetch_hexes_with_null_assignments(pool), + data_sets, + ) + .await?; + assigned_coverage_objs.write(self).await?; + assigned_coverage_objs.save(pool).await?; + Ok(()) + } +} + +pub struct NopDataSetProcessor; + +#[async_trait::async_trait] +impl DataSetProcessor for NopDataSetProcessor { + async fn set_all_oracle_boosting_assignments( + &self, + _pool: &PgPool, + _data_sets: &HexBoostData, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn set_unassigned_oracle_boosting_assignments( + &self, + _pool: &PgPool, + _data_sets: &HexBoostData, + ) -> anyhow::Result<()> { + Ok(()) + } +} + pub mod db { use super::*; From 7e3ad5325dca0021d5bc0d46e81ff71789269837 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 6 Jun 2024 14:34:09 -0400 Subject: [PATCH 2/2] Remove unused functions --- .../src/boosting_oracles/data_sets.rs | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index 39aad1bf2..770888228 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -833,30 +833,3 @@ pub struct AssignedHex { pub signal_power: i32, pub assignments: HexAssignments, } - -pub async fn set_all_oracle_boosting_assignments( - pool: &PgPool, - data_sets: &HexBoostData, - file_sink: &FileSinkClient, -) -> anyhow::Result<()> { - let assigned_coverage_objs = - AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(pool), data_sets).await?; - assigned_coverage_objs.write(file_sink).await?; - assigned_coverage_objs.save(pool).await?; - Ok(()) -} - -pub async fn set_unassigned_oracle_boosting_assignments( - pool: &PgPool, - data_sets: &HexBoostData, - file_sink: &FileSinkClient, -) -> anyhow::Result<()> { - let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream( - db::fetch_hexes_with_null_assignments(pool), - data_sets, - ) - .await?; - assigned_coverage_objs.write(file_sink).await?; - assigned_coverage_objs.save(pool).await?; - Ok(()) -}