diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs
index ca60b2358..770888228 100644
--- a/mobile_verifier/src/boosting_oracles/data_sets.rs
+++ b/mobile_verifier/src/boosting_oracles/data_sets.rs
@@ -173,11 +173,11 @@ where
h.urbanization.is_ready() && h.footfall.is_ready() && h.landtype.is_ready()
}
-pub struct DataSetDownloaderDaemon {
+pub struct DataSetDownloaderDaemon {
pool: PgPool,
data_sets: HexBoostData,
store: FileStore,
- oracle_boosting_sink: FileSinkClient,
+ data_set_processor: T,
data_set_directory: PathBuf,
new_coverage_object_notification: NewCoverageObjectNotification,
poll_duration: Duration,
@@ -217,11 +217,12 @@ impl DataSetStatus {
}
}
-impl ManagedTask for DataSetDownloaderDaemon
+impl ManagedTask for DataSetDownloaderDaemon
where
A: DataSet,
B: DataSet,
C: DataSet,
+ T: DataSetProcessor,
{
fn start_task(
self: Box,
@@ -243,7 +244,7 @@ where
}
}
-impl DataSetDownloaderDaemon {
+impl DataSetDownloaderDaemon {
pub async fn create_managed_task(
pool: PgPool,
settings: &Settings,
@@ -288,17 +289,18 @@ impl DataSetDownloaderDaemon {
}
}
-impl DataSetDownloaderDaemon
+impl DataSetDownloaderDaemon
where
A: DataSet,
B: DataSet,
C: DataSet,
+ T: DataSetProcessor,
{
pub fn new(
pool: PgPool,
data_sets: HexBoostData,
store: FileStore,
- oracle_boosting_sink: FileSinkClient,
+ data_set_processor: T,
data_set_directory: PathBuf,
new_coverage_object_notification: NewCoverageObjectNotification,
poll_duration: Duration,
@@ -307,7 +309,7 @@ where
pool,
data_sets,
store,
- oracle_boosting_sink,
+ data_set_processor,
data_set_directory,
new_coverage_object_notification,
poll_duration,
@@ -337,12 +339,9 @@ where
new_urbanized.is_some() || new_footfall.is_some() || new_landtype.is_some();
if is_hex_boost_data_ready(&self.data_sets) && new_data_set {
tracing::info!("Processing new data sets");
- set_all_oracle_boosting_assignments(
- &self.pool,
- &self.data_sets,
- &self.oracle_boosting_sink,
- )
- .await?;
+ self.data_set_processor
+ .set_all_oracle_boosting_assignments(&self.pool, &self.data_sets)
+ .await?;
}
// Mark the new data sets as processed and delete the old ones
@@ -394,12 +393,9 @@ where
// Attempt to fill in any unassigned hexes. This is for the edge case in
// which we shutdown before a coverage object updates.
if is_hex_boost_data_ready(&self.data_sets) {
- set_unassigned_oracle_boosting_assignments(
- &self.pool,
- &self.data_sets,
- &self.oracle_boosting_sink,
- )
- .await?;
+ self.data_set_processor
+ .set_unassigned_oracle_boosting_assignments(&self.pool, &self.data_sets)
+ .await?;
}
let mut wakeup = Instant::now() + self.poll_duration;
@@ -410,10 +406,9 @@ where
// If we see a new coverage object, we want to assign only those hexes
// that don't have an assignment
if is_hex_boost_data_ready(&self.data_sets) {
- set_unassigned_oracle_boosting_assignments(
+ self.data_set_processor.set_unassigned_oracle_boosting_assignments(
&self.pool,
&self.data_sets,
- &self.oracle_boosting_sink
).await?;
}
},
@@ -507,6 +502,73 @@ impl DataSetType {
}
}
+#[async_trait::async_trait]
+pub trait DataSetProcessor: Send + Sync + 'static {
+ async fn set_all_oracle_boosting_assignments(
+ &self,
+ pool: &PgPool,
+ data_sets: &HexBoostData,
+ ) -> anyhow::Result<()>;
+
+ async fn set_unassigned_oracle_boosting_assignments(
+ &self,
+ pool: &PgPool,
+ data_sets: &HexBoostData,
+ ) -> anyhow::Result<()>;
+}
+
+#[async_trait::async_trait]
+impl DataSetProcessor for FileSinkClient {
+ async fn set_all_oracle_boosting_assignments(
+ &self,
+ pool: &PgPool,
+ data_sets: &HexBoostData,
+ ) -> anyhow::Result<()> {
+ let assigned_coverage_objs =
+ AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(pool), data_sets)
+ .await?;
+ assigned_coverage_objs.write(self).await?;
+ assigned_coverage_objs.save(pool).await?;
+ Ok(())
+ }
+
+ async fn set_unassigned_oracle_boosting_assignments(
+ &self,
+ pool: &PgPool,
+ data_sets: &HexBoostData,
+ ) -> anyhow::Result<()> {
+ let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream(
+ db::fetch_hexes_with_null_assignments(pool),
+ data_sets,
+ )
+ .await?;
+ assigned_coverage_objs.write(self).await?;
+ assigned_coverage_objs.save(pool).await?;
+ Ok(())
+ }
+}
+
+pub struct NopDataSetProcessor;
+
+#[async_trait::async_trait]
+impl DataSetProcessor for NopDataSetProcessor {
+ async fn set_all_oracle_boosting_assignments(
+ &self,
+ _pool: &PgPool,
+ _data_sets: &HexBoostData,
+ ) -> anyhow::Result<()> {
+ Ok(())
+ }
+
+ async fn set_unassigned_oracle_boosting_assignments(
+ &self,
+ _pool: &PgPool,
+ _data_sets: &HexBoostData,
+ ) -> anyhow::Result<()> {
+ Ok(())
+ }
+}
+
pub mod db {
use super::*;
@@ -771,30 +833,3 @@ pub struct AssignedHex {
pub signal_power: i32,
pub assignments: HexAssignments,
}
-
-pub async fn set_all_oracle_boosting_assignments(
- pool: &PgPool,
- data_sets: &HexBoostData,
- file_sink: &FileSinkClient,
-) -> anyhow::Result<()> {
- let assigned_coverage_objs =
- AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(pool), data_sets).await?;
- assigned_coverage_objs.write(file_sink).await?;
- assigned_coverage_objs.save(pool).await?;
- Ok(())
-}
-
-pub async fn set_unassigned_oracle_boosting_assignments(
- pool: &PgPool,
- data_sets: &HexBoostData,
- file_sink: &FileSinkClient,
-) -> anyhow::Result<()> {
- let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream(
- db::fetch_hexes_with_null_assignments(pool),
- data_sets,
- )
- .await?;
- assigned_coverage_objs.write(file_sink).await?;
- assigned_coverage_objs.save(pool).await?;
- Ok(())
-}