From 198a838e54f7eedcb37fc5c73f17133373d0dd59 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Wed, 15 May 2024 15:56:54 -0400 Subject: [PATCH] Move things around --- .../src/boosting_oracles/data_sets.rs | 272 +++++++++++------- 1 file changed, 166 insertions(+), 106 deletions(-) diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index dc4be1b33..c3074b9c5 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -262,15 +262,12 @@ where new_urbanized.is_some() || new_footfall.is_some() || new_landtype.is_some(); if self.data_sets.is_ready() && new_data_set { tracing::info!("Processing new data sets"); - let boosting_reports = set_oracle_boosting_assignments( - UnassignedHex::fetch_all(&self.pool), - &self.data_sets, + set_all_oracle_boosting_assignments( &self.pool, + &self.data_sets, + &self.oracle_boosting_sink, ) .await?; - self.oracle_boosting_sink - .write_all(boosting_reports) - .await?; } // Mark the new data sets as processed and delete the old ones @@ -324,15 +321,12 @@ where // Attempt to fill in any unassigned hexes. This is for the edge case in // which we shutdown before a coverage object updates. if self.data_sets.is_ready() { - let boosting_reports = set_oracle_boosting_assignments( - UnassignedHex::fetch_unassigned(&self.pool), - &self.data_sets, + set_unassigned_oracle_boosting_assignments( &self.pool, + &self.data_sets, + &self.oracle_boosting_sink, ) .await?; - self.oracle_boosting_sink - .write_all(boosting_reports) - .await?; } loop { @@ -342,15 +336,11 @@ where // If we see a new coverage object, we want to assign only those hexes // that don't have an assignment if self.data_sets.is_ready() { - let boosting_reports = set_oracle_boosting_assignments( - UnassignedHex::fetch_unassigned(&self.pool), - &self.data_sets, + set_unassigned_oracle_boosting_assignments( &self.pool, - ) - .await?; - self.oracle_boosting_sink - .write_all(boosting_reports) - .await?; + &self.data_sets, + &self.oracle_boosting_sink + ).await?; } }, _ = tokio::time::sleep(poll_duration.to_std()?) => { @@ -560,23 +550,42 @@ pub mod db { } } -#[derive(FromRow)] -pub struct UnassignedHex { - uuid: uuid::Uuid, - #[sqlx(try_from = "i64")] - hex: u64, - signal_level: SignalLevel, - signal_power: i32, +pub struct AssignedCoverageObjects { + coverage_objs: HashMap>, } -impl UnassignedHex { - pub fn fetch_all(pool: &PgPool) -> impl Stream> + '_ { - sqlx::query_as("SELECT uuid, hex, signal_level, signal_power FROM hexes").fetch(pool) +impl AssignedCoverageObjects { + async fn from_stream( + stream: impl Stream>, + data_sets: &HexBoostData, + ) -> anyhow::Result { + let mut coverage_objs = HashMap::>::new(); + let mut stream = pin!(stream); + while let Some(hex) = stream.try_next().await? { + let hex = hex.assign(data_sets)?; + coverage_objs.entry(hex.uuid).or_default().push(hex); + } + Ok(Self { coverage_objs }) } - pub fn fetch_unassigned(pool: &PgPool) -> impl Stream> + '_ { - sqlx::query_as( - "SELECT + async fn fetch_all( + pool: &PgPool, + data_sets: &HexBoostData, + ) -> anyhow::Result { + Self::from_stream( + sqlx::query_as("SELECT uuid, hex, signal_level, signal_power FROM hexes").fetch(pool), + data_sets, + ) + .await + } + + async fn fetch_unassigned( + pool: &PgPool, + data_sets: &HexBoostData, + ) -> anyhow::Result { + Self::from_stream( + sqlx::query_as( + "SELECT uuid, hex, signal_level, signal_power FROM hexes @@ -584,89 +593,140 @@ impl UnassignedHex { urbanized IS NULL OR footfall IS NULL OR landtype IS NULL", + ) + .fetch(pool), + data_sets, ) - .fetch(pool) + .await } -} -pub async fn set_oracle_boosting_assignments<'a>( - unassigned_hexes: impl Stream>, - data_sets: &HexBoostData, - pool: &'a PgPool, -) -> anyhow::Result> { - const NUMBER_OF_FIELDS_IN_QUERY: u16 = 7; - const ASSIGNMENTS_MAX_BATCH_ENTRIES: usize = (u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize; - - let now = Utc::now(); - let mut boost_results = HashMap::>::new(); - let mut unassigned_hexes = pin!(unassigned_hexes.try_chunks(ASSIGNMENTS_MAX_BATCH_ENTRIES)); - - while let Some(hexes) = unassigned_hexes.try_next().await? { - let hexes: anyhow::Result> = hexes - .into_iter() - .map(|hex| { - let cell = hextree::Cell::try_from(hex.hex)?; - let assignments = HexAssignments::builder(cell) - .footfall(&data_sets.footfall) - .landtype(&data_sets.landtype) - .urbanized(&data_sets.urbanization) - .build()?; - let location = format!("{:x}", hex.hex); - let assignment_multiplier = (assignments.boosting_multiplier() * dec!(1000)) + async fn write(&self, boosting_reports: &FileSinkClient) -> file_store::Result { + let timestamp = Utc::now().encode_timestamp(); + for (uuid, hexes) in self.coverage_objs.iter() { + let assignments: Vec<_> = hexes + .iter() + .map(|hex| { + let location = format!("{:x}", hex.hex); + let assignment_multiplier = (hex.assignments.boosting_multiplier() + * dec!(1000)) .to_u32() .unwrap_or(0); - - boost_results.entry(hex.uuid).or_default().push( proto::OracleBoostingHexAssignment { location, - urbanized: assignments.urbanized.into(), - footfall: assignments.footfall.into(), - landtype: assignments.landtype.into(), + urbanized: hex.assignments.urbanized.into(), + footfall: hex.assignments.footfall.into(), + landtype: hex.assignments.landtype.into(), assignment_multiplier, + } + }) + .collect(); + boosting_reports + .write( + proto::OracleBoostingReportV1 { + coverage_object: Vec::from(uuid.into_bytes()), + assignments, + timestamp, }, - ); - - Ok(( - hex, - assignments.footfall, - assignments.landtype, - assignments.urbanized, - )) - }) - .collect(); - - QueryBuilder::new( - "INSERT INTO hexes (uuid, hex, signal_level, signal_power, footfall, landtype, urbanized)", - ) - .push_values(hexes?, |mut b, (hex, footfall, landtype, urbanized)| { - b.push_bind(hex.uuid) - .push_bind(hex.hex as i64) - .push_bind(hex.signal_level) - .push_bind(hex.signal_power) - .push_bind(footfall) - .push_bind(landtype) - .push_bind(urbanized); + &[], + ) + .await?; + } + + Ok(()) + } + + async fn save(self, pool: &PgPool) -> anyhow::Result<()> { + const NUMBER_OF_FIELDS_IN_QUERY: u16 = 7; + const ASSIGNMENTS_MAX_BATCH_ENTRIES: usize = + (u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize; + + let assigned_hexes: Vec<_> = self.coverage_objs.into_values().flatten().collect(); + for assigned_hexes in assigned_hexes.chunks(ASSIGNMENTS_MAX_BATCH_ENTRIES) { + QueryBuilder::new( + "INSERT INTO hexes (uuid, hex, signal_level, signal_power, footfall, landtype, urbanized)", + ) + .push_values(assigned_hexes, |mut b, hex| { + b.push_bind(hex.uuid) + .push_bind(hex.hex as i64) + .push_bind(hex.signal_level) + .push_bind(hex.signal_power) + .push_bind(hex.assignments.footfall) + .push_bind(hex.assignments.landtype) + .push_bind(hex.assignments.urbanized); + }) + .push( + r#" + ON CONFLICT (uuid, hex) DO UPDATE SET + footfall = EXCLUDED.footfall, + landtype = EXCLUDED.landtype, + urbanized = EXCLUDED.urbanized + "#, + ) + .build() + .execute(pool) + .await?; + } + + Ok(()) + } +} + +#[derive(FromRow)] +pub struct UnassignedHex { + uuid: uuid::Uuid, + #[sqlx(try_from = "i64")] + hex: u64, + signal_level: SignalLevel, + signal_power: i32, +} + +impl UnassignedHex { + fn assign( + self, + data_sets: &HexBoostData, + ) -> anyhow::Result { + let cell = hextree::Cell::try_from(self.hex)?; + let assignments = HexAssignments::builder(cell) + .footfall(&data_sets.footfall) + .landtype(&data_sets.landtype) + .urbanized(&data_sets.urbanization) + .build()?; + Ok(AssignedHex { + uuid: self.uuid, + hex: self.hex, + signal_level: self.signal_level, + signal_power: self.signal_power, + assignments, }) - .push( - r#" - ON CONFLICT (uuid, hex) DO UPDATE SET - footfall = EXCLUDED.footfall, - landtype = EXCLUDED.landtype, - urbanized = EXCLUDED.urbanized - "#, - ) - .build() - .execute(pool) - .await?; } +} + +pub struct AssignedHex { + uuid: uuid::Uuid, + hex: u64, + signal_level: SignalLevel, + signal_power: i32, + assignments: HexAssignments, +} + +pub async fn set_all_oracle_boosting_assignments( + pool: &PgPool, + data_sets: &HexBoostData, + file_sink: &FileSinkClient, +) -> anyhow::Result<()> { + let assigned_coverage_objs = AssignedCoverageObjects::fetch_all(pool, data_sets).await?; + assigned_coverage_objs.write(file_sink).await?; + assigned_coverage_objs.save(pool).await?; + Ok(()) +} - Ok(boost_results - .into_iter() - .map( - move |(coverage_object, assignments)| proto::OracleBoostingReportV1 { - coverage_object: Vec::from(coverage_object.into_bytes()), - assignments, - timestamp: now.encode_timestamp(), - }, - )) +pub async fn set_unassigned_oracle_boosting_assignments( + pool: &PgPool, + data_sets: &HexBoostData, + file_sink: &FileSinkClient, +) -> anyhow::Result<()> { + let assigned_coverage_objs = AssignedCoverageObjects::fetch_unassigned(pool, data_sets).await?; + assigned_coverage_objs.write(file_sink).await?; + assigned_coverage_objs.save(pool).await?; + Ok(()) }