diff --git a/mobile_verifier/migrations/29_save_lat_and_lon.sql b/mobile_verifier/migrations/29_save_lat_and_lon.sql new file mode 100644 index 000000000..9073f8eb2 --- /dev/null +++ b/mobile_verifier/migrations/29_save_lat_and_lon.sql @@ -0,0 +1,3 @@ +ALTER TABLE wifi_heartbeats +ADD COLUMN lat DOUBLE PRECISION NOT NULL DEFAULT 0.0, +ADD COLUMN lon DOUBLE PRECISION NOT NULL DEFAULT 0.0; diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index 023cddf32..5ca03455d 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -2,6 +2,7 @@ use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat}; use crate::{ coverage::{CoverageClaimTimeCache, CoverageObjectCache}, geofence::GeofenceValidator, + heartbeats::LocationCache, GatewayResolver, }; @@ -74,6 +75,8 @@ where let coverage_claim_time_cache = CoverageClaimTimeCache::new(); let coverage_object_cache = CoverageObjectCache::new(&self.pool); + // Unused: + let location_cache = LocationCache::new(&self.pool); loop { #[rustfmt::skip] @@ -90,6 +93,7 @@ where &heartbeat_cache, &coverage_claim_time_cache, &coverage_object_cache, + &location_cache, ).await?; metrics::histogram!("cbrs_heartbeat_processing_time", start.elapsed()); } @@ -105,6 +109,7 @@ where heartbeat_cache: &Arc), ()>>, coverage_claim_time_cache: &CoverageClaimTimeCache, coverage_object_cache: &CoverageObjectCache, + location_cache: &LocationCache, ) -> anyhow::Result<()> { tracing::info!("Processing CBRS heartbeat file {}", file.file_info.key); let mut transaction = self.pool.begin().await?; @@ -122,9 +127,10 @@ where }); process_validated_heartbeats( ValidatedHeartbeat::validate_heartbeats( - &self.gateway_info_resolver, heartbeats, + &self.gateway_info_resolver, coverage_object_cache, + location_cache, self.max_distance_to_asserted, self.max_distance_to_coverage, &epoch, diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index c8221137b..174b84fb7 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -20,8 +20,8 @@ use helium_proto::services::poc_mobile as proto; use retainer::Cache; use rust_decimal::{prelude::ToPrimitive, Decimal}; use rust_decimal_macros::dec; -use sqlx::{postgres::PgTypeInfo, Decode, Encode, Postgres, Transaction, Type}; -use std::{ops::Range, pin::pin, time}; +use sqlx::{postgres::PgTypeInfo, Decode, Encode, PgPool, Postgres, Transaction, Type}; +use std::{ops::Range, pin::pin, sync::Arc, time}; use uuid::Uuid; /// Minimum number of heartbeats required to give a reward to the hotspot. @@ -379,10 +379,12 @@ impl ValidatedHeartbeat { } /// Validate a heartbeat in the given epoch. + #[allow(clippy::too_many_arguments)] pub async fn validate( - heartbeat: Heartbeat, + mut heartbeat: Heartbeat, gateway_info_resolver: &impl GatewayResolver, coverage_object_cache: &CoverageObjectCache, + last_location_cache: &LocationCache, max_distance_to_asserted: u32, max_distance_to_coverage: u32, epoch: &Range>, @@ -470,7 +472,7 @@ impl ValidatedHeartbeat { )); } - let Ok(hb_latlng) = heartbeat.centered_latlng() else { + let Ok(mut hb_latlng) = heartbeat.centered_latlng() else { return Ok(Self::new( heartbeat, cell_type, @@ -516,8 +518,39 @@ impl ValidatedHeartbeat { } GatewayResolution::AssertedLocation(location) if heartbeat.hb_type == HbType::Wifi => { let asserted_latlng: LatLng = CellIndex::try_from(location)?.into(); + let is_valid = match heartbeat.location_validation_timestamp { + None => { + if let Some(last_location) = last_location_cache + .fetch_last_location(&heartbeat.hotspot_key) + .await? + { + heartbeat.lat = last_location.lat; + heartbeat.lon = last_location.lon; + heartbeat.location_validation_timestamp = + Some(last_location.location_validation_timestamp); + // Can't panic, previous lat and lon must be valid. + hb_latlng = heartbeat.centered_latlng().unwrap(); + true + } else { + false + } + } + Some(location_validation_timestamp) => { + last_location_cache + .set_last_location( + &heartbeat.hotspot_key, + LastLocation::new( + location_validation_timestamp, + heartbeat.lat, + heartbeat.lon, + ), + ) + .await?; + true + } + }; let distance_to_asserted = asserted_latlng.distance_m(hb_latlng).round() as i64; - let location_trust_score_multiplier = if heartbeat.location_validation_timestamp.is_some() + let location_trust_score_multiplier = if is_valid // The heartbeat location to asserted location must be less than the max_distance_to_asserted value: && distance_to_asserted <= max_distance_to_asserted as i64 // The heartbeat location to every associated coverage hex must be less than max_distance_to_coverage: @@ -547,10 +580,12 @@ impl ValidatedHeartbeat { } } + #[allow(clippy::too_many_arguments)] pub fn validate_heartbeats<'a>( - gateway_info_resolver: &'a impl GatewayResolver, heartbeats: impl Stream + 'a, + gateway_info_resolver: &'a impl GatewayResolver, coverage_object_cache: &'a CoverageObjectCache, + last_location_cache: &'a LocationCache, max_distance_to_asserted: u32, max_distance_to_coverage: u32, epoch: &'a Range>, @@ -561,6 +596,7 @@ impl ValidatedHeartbeat { heartbeat, gateway_info_resolver, coverage_object_cache, + last_location_cache, max_distance_to_asserted, max_distance_to_coverage, epoch, @@ -654,8 +690,8 @@ impl ValidatedHeartbeat { let truncated_timestamp = self.truncated_timestamp()?; sqlx::query( r#" - INSERT INTO wifi_heartbeats (hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, distance_to_asserted) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO wifi_heartbeats (hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, distance_to_asserted, lat, lon) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (hotspot_key, truncated_timestamp) DO UPDATE SET latest_timestamp = EXCLUDED.latest_timestamp, coverage_object = EXCLUDED.coverage_object @@ -668,6 +704,8 @@ impl ValidatedHeartbeat { .bind(self.heartbeat.coverage_object) .bind(self.location_trust_score_multiplier) .bind(self.distance_to_asserted) + .bind(self.heartbeat.lat) + .bind(self.heartbeat.lon) .execute(&mut *exec) .await?; Ok(()) @@ -741,6 +779,111 @@ pub async fn clear_heartbeats( Ok(()) } +/// A cache for previous valid (or invalid) WiFi heartbeat locations +#[derive(Clone)] +pub struct LocationCache { + pool: PgPool, + locations: Arc>>, +} + +impl LocationCache { + pub fn new(pool: &PgPool) -> Self { + let locations = Arc::new(Cache::new()); + let locations_clone = locations.clone(); + tokio::spawn(async move { + locations_clone + .monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 24)) + .await + }); + Self { + pool: pool.clone(), + locations, + } + } + + async fn fetch_from_db_and_set( + &self, + hotspot: &PublicKeyBinary, + ) -> anyhow::Result> { + let last_location: Option = sqlx::query_as( + r#" + SELECT location_validation_timestamp, lat, lon + FROM wifi_heartbeats + WHERE location_validation_timestamp IS NOT NULL + AND location_validation_timestamp >= $1 + ORDER BY DESC location_validation_timestamp + LIMIT 1 + "#, + ) + .bind(Utc::now() - Duration::hours(12)) + .fetch_optional(&self.pool) + .await?; + self.locations + .insert( + hotspot.clone(), + last_location, + last_location + .map(|x| x.duration_to_expiration()) + .unwrap_or_else(|| Duration::days(365)) + .to_std()?, + ) + .await; + Ok(last_location) + } + + pub async fn fetch_last_location( + &self, + hotspot: &PublicKeyBinary, + ) -> anyhow::Result> { + Ok( + if let Some(last_location) = self.locations.get(hotspot).await { + *last_location + } else { + self.fetch_from_db_and_set(hotspot).await? + }, + ) + } + + pub async fn set_last_location( + &self, + hotspot: &PublicKeyBinary, + last_location: LastLocation, + ) -> anyhow::Result<()> { + let duration_to_expiration = last_location.duration_to_expiration(); + self.locations + .insert( + hotspot.clone(), + Some(last_location), + duration_to_expiration.to_std()?, + ) + .await; + Ok(()) + } +} + +#[derive(sqlx::FromRow, Copy, Clone)] +pub struct LastLocation { + location_validation_timestamp: DateTime, + lat: f64, + lon: f64, +} + +impl LastLocation { + fn new(location_validation_timestamp: DateTime, lat: f64, lon: f64) -> Self { + Self { + location_validation_timestamp, + lat, + lon, + } + } + + /// Calculates the duration from now in which last_valid_timestamp is 12 hours old + fn duration_to_expiration(&self) -> Duration { + ((self.location_validation_timestamp + Duration::hours(12)) - Utc::now()) + .max(Duration::zero()) + } +} + pub struct SeniorityUpdate<'a> { heartbeat: &'a ValidatedHeartbeat, action: SeniorityUpdateAction, diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index 8cfdca952..65749755c 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -2,6 +2,7 @@ use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat}; use crate::{ coverage::{CoverageClaimTimeCache, CoverageObjectCache}, geofence::GeofenceValidator, + heartbeats::LocationCache, GatewayResolver, }; use chrono::{DateTime, Duration, Utc}; @@ -73,6 +74,7 @@ where let coverage_claim_time_cache = CoverageClaimTimeCache::new(); let coverage_object_cache = CoverageObjectCache::new(&self.pool); + let location_cache = LocationCache::new(&self.pool); loop { #[rustfmt::skip] @@ -89,6 +91,7 @@ where &heartbeat_cache, &coverage_claim_time_cache, &coverage_object_cache, + &location_cache ).await?; metrics::histogram!("wifi_heartbeat_processing_time", start.elapsed()); } @@ -104,6 +107,7 @@ where heartbeat_cache: &Cache<(String, DateTime), ()>, coverage_claim_time_cache: &CoverageClaimTimeCache, coverage_object_cache: &CoverageObjectCache, + location_cache: &LocationCache, ) -> anyhow::Result<()> { tracing::info!("Processing WIFI heartbeat file {}", file.file_info.key); let mut transaction = self.pool.begin().await?; @@ -115,9 +119,10 @@ where .map(Heartbeat::from); process_validated_heartbeats( ValidatedHeartbeat::validate_heartbeats( - &self.gateway_info_resolver, heartbeats, + &self.gateway_info_resolver, coverage_object_cache, + location_cache, self.max_distance_to_asserted, self.max_distance_to_coverage, &epoch, diff --git a/mobile_verifier/tests/boosting_oracles.rs b/mobile_verifier/tests/boosting_oracles.rs index 5687b53cb..c42fd38eb 100644 --- a/mobile_verifier/tests/boosting_oracles.rs +++ b/mobile_verifier/tests/boosting_oracles.rs @@ -17,7 +17,7 @@ use mobile_verifier::{ CoverageObjectCache, Seniority, UnassignedHex, }, geofence::GeofenceValidator, - heartbeats::{Heartbeat, HeartbeatReward, SeniorityUpdate, ValidatedHeartbeat}, + heartbeats::{Heartbeat, HeartbeatReward, LocationCache, SeniorityUpdate, ValidatedHeartbeat}, radio_threshold::VerifiedRadioThresholds, reward_shares::CoveragePoints, speedtests::Speedtest, @@ -371,12 +371,14 @@ async fn test_footfall_and_urbanization(pool: PgPool) -> anyhow::Result<()> { let coverage_objects = CoverageObjectCache::new(&pool); let coverage_claim_time_cache = CoverageClaimTimeCache::new(); + let location_cache = LocationCache::new(&pool); let epoch = start..end; let mut heartbeats = pin!(ValidatedHeartbeat::validate_heartbeats( - &AllOwnersValid, stream::iter(heartbeats.map(Heartbeat::from)), + &AllOwnersValid, &coverage_objects, + &location_cache, 2000, 2000, &epoch, diff --git a/mobile_verifier/tests/modeled_coverage.rs b/mobile_verifier/tests/modeled_coverage.rs index 84cbe6563..0a40b0f22 100644 --- a/mobile_verifier/tests/modeled_coverage.rs +++ b/mobile_verifier/tests/modeled_coverage.rs @@ -20,7 +20,9 @@ use mobile_verifier::{ CoverageObjectCache, Seniority, UnassignedHex, }, geofence::GeofenceValidator, - heartbeats::{Heartbeat, HeartbeatReward, KeyType, SeniorityUpdate, ValidatedHeartbeat}, + heartbeats::{ + Heartbeat, HeartbeatReward, KeyType, LocationCache, SeniorityUpdate, ValidatedHeartbeat, + }, radio_threshold::VerifiedRadioThresholds, reward_shares::CoveragePoints, speedtests::Speedtest, @@ -398,6 +400,7 @@ async fn process_input( ) -> anyhow::Result<()> { let coverage_objects = CoverageObjectCache::new(pool); let coverage_claim_time_cache = CoverageClaimTimeCache::new(); + let location_cache = LocationCache::new(pool); let mut transaction = pool.begin().await?; let mut coverage_objs = pin!(CoverageObject::validate_coverage_objects( @@ -419,9 +422,10 @@ async fn process_input( let mut transaction = pool.begin().await?; let mut heartbeats = pin!(ValidatedHeartbeat::validate_heartbeats( - &AllOwnersValid, stream::iter(heartbeats.map(Heartbeat::from)), + &AllOwnersValid, &coverage_objects, + &location_cache, 2000, 2000, epoch, @@ -1377,11 +1381,13 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow let hb_2: Heartbeat = hb_2.into(); let coverage_object_cache = CoverageObjectCache::new(&pool); + let location_cache = LocationCache::new(&pool); let validated_hb_1 = ValidatedHeartbeat::validate( hb_1, &AllOwnersValid, &coverage_object_cache, + &location_cache, 2000, 2000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC), @@ -1396,6 +1402,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow hb_2.clone(), &AllOwnersValid, &coverage_object_cache, + &location_cache, 1000000, 2000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC), @@ -1410,6 +1417,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow hb_2.clone(), &AllOwnersValid, &coverage_object_cache, + &location_cache, 2000, 1000000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC), @@ -1424,6 +1432,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow hb_2.clone(), &AllOwnersValid, &coverage_object_cache, + &location_cache, 1000000, 1000000, &(DateTime::::MIN_UTC..DateTime::::MAX_UTC),