Skip to content

Commit

Permalink
Lookup previous WiFi locations in case of None location validation ti…
Browse files Browse the repository at this point in the history
…mestamp (#774)

* Add logic for caching locations

* Fix tests, clippy

* Use TTL for cache

* Fixes
  • Loading branch information
Matthew Plant authored Mar 28, 2024
1 parent 3e356bb commit fab10f6
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 14 deletions.
3 changes: 3 additions & 0 deletions mobile_verifier/migrations/29_save_lat_and_lon.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE wifi_heartbeats
ADD COLUMN lat DOUBLE PRECISION NOT NULL DEFAULT 0.0,
ADD COLUMN lon DOUBLE PRECISION NOT NULL DEFAULT 0.0;
8 changes: 7 additions & 1 deletion mobile_verifier/src/heartbeats/cbrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat};
use crate::{
coverage::{CoverageClaimTimeCache, CoverageObjectCache},
geofence::GeofenceValidator,
heartbeats::LocationCache,
GatewayResolver,
};

Expand Down Expand Up @@ -74,6 +75,8 @@ where

let coverage_claim_time_cache = CoverageClaimTimeCache::new();
let coverage_object_cache = CoverageObjectCache::new(&self.pool);
// Unused:
let location_cache = LocationCache::new(&self.pool);

loop {
#[rustfmt::skip]
Expand All @@ -90,6 +93,7 @@ where
&heartbeat_cache,
&coverage_claim_time_cache,
&coverage_object_cache,
&location_cache,
).await?;
metrics::histogram!("cbrs_heartbeat_processing_time", start.elapsed());
}
Expand All @@ -105,6 +109,7 @@ where
heartbeat_cache: &Arc<Cache<(String, DateTime<Utc>), ()>>,
coverage_claim_time_cache: &CoverageClaimTimeCache,
coverage_object_cache: &CoverageObjectCache,
location_cache: &LocationCache,
) -> anyhow::Result<()> {
tracing::info!("Processing CBRS heartbeat file {}", file.file_info.key);
let mut transaction = self.pool.begin().await?;
Expand All @@ -122,9 +127,10 @@ where
});
process_validated_heartbeats(
ValidatedHeartbeat::validate_heartbeats(
&self.gateway_info_resolver,
heartbeats,
&self.gateway_info_resolver,
coverage_object_cache,
location_cache,
self.max_distance_to_asserted,
self.max_distance_to_coverage,
&epoch,
Expand Down
159 changes: 151 additions & 8 deletions mobile_verifier/src/heartbeats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use helium_proto::services::poc_mobile as proto;
use retainer::Cache;
use rust_decimal::{prelude::ToPrimitive, Decimal};
use rust_decimal_macros::dec;
use sqlx::{postgres::PgTypeInfo, Decode, Encode, Postgres, Transaction, Type};
use std::{ops::Range, pin::pin, time};
use sqlx::{postgres::PgTypeInfo, Decode, Encode, PgPool, Postgres, Transaction, Type};
use std::{ops::Range, pin::pin, sync::Arc, time};
use uuid::Uuid;

/// Minimum number of heartbeats required to give a reward to the hotspot.
Expand Down Expand Up @@ -379,10 +379,12 @@ impl ValidatedHeartbeat {
}

/// Validate a heartbeat in the given epoch.
#[allow(clippy::too_many_arguments)]
pub async fn validate(
heartbeat: Heartbeat,
mut heartbeat: Heartbeat,
gateway_info_resolver: &impl GatewayResolver,
coverage_object_cache: &CoverageObjectCache,
last_location_cache: &LocationCache,
max_distance_to_asserted: u32,
max_distance_to_coverage: u32,
epoch: &Range<DateTime<Utc>>,
Expand Down Expand Up @@ -470,7 +472,7 @@ impl ValidatedHeartbeat {
));
}

let Ok(hb_latlng) = heartbeat.centered_latlng() else {
let Ok(mut hb_latlng) = heartbeat.centered_latlng() else {
return Ok(Self::new(
heartbeat,
cell_type,
Expand Down Expand Up @@ -516,8 +518,39 @@ impl ValidatedHeartbeat {
}
GatewayResolution::AssertedLocation(location) if heartbeat.hb_type == HbType::Wifi => {
let asserted_latlng: LatLng = CellIndex::try_from(location)?.into();
let is_valid = match heartbeat.location_validation_timestamp {
None => {
if let Some(last_location) = last_location_cache
.fetch_last_location(&heartbeat.hotspot_key)
.await?
{
heartbeat.lat = last_location.lat;
heartbeat.lon = last_location.lon;
heartbeat.location_validation_timestamp =
Some(last_location.location_validation_timestamp);
// Can't panic, previous lat and lon must be valid.
hb_latlng = heartbeat.centered_latlng().unwrap();
true
} else {
false
}
}
Some(location_validation_timestamp) => {
last_location_cache
.set_last_location(
&heartbeat.hotspot_key,
LastLocation::new(
location_validation_timestamp,
heartbeat.lat,
heartbeat.lon,
),
)
.await?;
true
}
};
let distance_to_asserted = asserted_latlng.distance_m(hb_latlng).round() as i64;
let location_trust_score_multiplier = if heartbeat.location_validation_timestamp.is_some()
let location_trust_score_multiplier = if is_valid
// The heartbeat location to asserted location must be less than the max_distance_to_asserted value:
&& distance_to_asserted <= max_distance_to_asserted as i64
// The heartbeat location to every associated coverage hex must be less than max_distance_to_coverage:
Expand Down Expand Up @@ -547,10 +580,12 @@ impl ValidatedHeartbeat {
}
}

#[allow(clippy::too_many_arguments)]
pub fn validate_heartbeats<'a>(
gateway_info_resolver: &'a impl GatewayResolver,
heartbeats: impl Stream<Item = Heartbeat> + 'a,
gateway_info_resolver: &'a impl GatewayResolver,
coverage_object_cache: &'a CoverageObjectCache,
last_location_cache: &'a LocationCache,
max_distance_to_asserted: u32,
max_distance_to_coverage: u32,
epoch: &'a Range<DateTime<Utc>>,
Expand All @@ -561,6 +596,7 @@ impl ValidatedHeartbeat {
heartbeat,
gateway_info_resolver,
coverage_object_cache,
last_location_cache,
max_distance_to_asserted,
max_distance_to_coverage,
epoch,
Expand Down Expand Up @@ -654,8 +690,8 @@ impl ValidatedHeartbeat {
let truncated_timestamp = self.truncated_timestamp()?;
sqlx::query(
r#"
INSERT INTO wifi_heartbeats (hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, distance_to_asserted)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO wifi_heartbeats (hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object, location_trust_score_multiplier, distance_to_asserted, lat, lon)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (hotspot_key, truncated_timestamp) DO UPDATE SET
latest_timestamp = EXCLUDED.latest_timestamp,
coverage_object = EXCLUDED.coverage_object
Expand All @@ -668,6 +704,8 @@ impl ValidatedHeartbeat {
.bind(self.heartbeat.coverage_object)
.bind(self.location_trust_score_multiplier)
.bind(self.distance_to_asserted)
.bind(self.heartbeat.lat)
.bind(self.heartbeat.lon)
.execute(&mut *exec)
.await?;
Ok(())
Expand Down Expand Up @@ -741,6 +779,111 @@ pub async fn clear_heartbeats(
Ok(())
}

/// A cache for previous valid (or invalid) WiFi heartbeat locations
#[derive(Clone)]
pub struct LocationCache {
pool: PgPool,
locations: Arc<Cache<PublicKeyBinary, Option<LastLocation>>>,
}

impl LocationCache {
pub fn new(pool: &PgPool) -> Self {
let locations = Arc::new(Cache::new());
let locations_clone = locations.clone();
tokio::spawn(async move {
locations_clone
.monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 24))
.await
});
Self {
pool: pool.clone(),
locations,
}
}

async fn fetch_from_db_and_set(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
let last_location: Option<LastLocation> = sqlx::query_as(
r#"
SELECT location_validation_timestamp, lat, lon
FROM wifi_heartbeats
WHERE location_validation_timestamp IS NOT NULL
AND location_validation_timestamp >= $1
ORDER BY DESC location_validation_timestamp
LIMIT 1
"#,
)
.bind(Utc::now() - Duration::hours(12))
.fetch_optional(&self.pool)
.await?;
self.locations
.insert(
hotspot.clone(),
last_location,
last_location
.map(|x| x.duration_to_expiration())
.unwrap_or_else(|| Duration::days(365))
.to_std()?,
)
.await;
Ok(last_location)
}

pub async fn fetch_last_location(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
Ok(
if let Some(last_location) = self.locations.get(hotspot).await {
*last_location
} else {
self.fetch_from_db_and_set(hotspot).await?
},
)
}

pub async fn set_last_location(
&self,
hotspot: &PublicKeyBinary,
last_location: LastLocation,
) -> anyhow::Result<()> {
let duration_to_expiration = last_location.duration_to_expiration();
self.locations
.insert(
hotspot.clone(),
Some(last_location),
duration_to_expiration.to_std()?,
)
.await;
Ok(())
}
}

#[derive(sqlx::FromRow, Copy, Clone)]
pub struct LastLocation {
location_validation_timestamp: DateTime<Utc>,
lat: f64,
lon: f64,
}

impl LastLocation {
fn new(location_validation_timestamp: DateTime<Utc>, lat: f64, lon: f64) -> Self {
Self {
location_validation_timestamp,
lat,
lon,
}
}

/// Calculates the duration from now in which last_valid_timestamp is 12 hours old
fn duration_to_expiration(&self) -> Duration {
((self.location_validation_timestamp + Duration::hours(12)) - Utc::now())
.max(Duration::zero())
}
}

pub struct SeniorityUpdate<'a> {
heartbeat: &'a ValidatedHeartbeat,
action: SeniorityUpdateAction,
Expand Down
7 changes: 6 additions & 1 deletion mobile_verifier/src/heartbeats/wifi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::{process_validated_heartbeats, Heartbeat, ValidatedHeartbeat};
use crate::{
coverage::{CoverageClaimTimeCache, CoverageObjectCache},
geofence::GeofenceValidator,
heartbeats::LocationCache,
GatewayResolver,
};
use chrono::{DateTime, Duration, Utc};
Expand Down Expand Up @@ -73,6 +74,7 @@ where

let coverage_claim_time_cache = CoverageClaimTimeCache::new();
let coverage_object_cache = CoverageObjectCache::new(&self.pool);
let location_cache = LocationCache::new(&self.pool);

loop {
#[rustfmt::skip]
Expand All @@ -89,6 +91,7 @@ where
&heartbeat_cache,
&coverage_claim_time_cache,
&coverage_object_cache,
&location_cache
).await?;
metrics::histogram!("wifi_heartbeat_processing_time", start.elapsed());
}
Expand All @@ -104,6 +107,7 @@ where
heartbeat_cache: &Cache<(String, DateTime<Utc>), ()>,
coverage_claim_time_cache: &CoverageClaimTimeCache,
coverage_object_cache: &CoverageObjectCache,
location_cache: &LocationCache,
) -> anyhow::Result<()> {
tracing::info!("Processing WIFI heartbeat file {}", file.file_info.key);
let mut transaction = self.pool.begin().await?;
Expand All @@ -115,9 +119,10 @@ where
.map(Heartbeat::from);
process_validated_heartbeats(
ValidatedHeartbeat::validate_heartbeats(
&self.gateway_info_resolver,
heartbeats,
&self.gateway_info_resolver,
coverage_object_cache,
location_cache,
self.max_distance_to_asserted,
self.max_distance_to_coverage,
&epoch,
Expand Down
6 changes: 4 additions & 2 deletions mobile_verifier/tests/boosting_oracles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use mobile_verifier::{
CoverageObjectCache, Seniority, UnassignedHex,
},
geofence::GeofenceValidator,
heartbeats::{Heartbeat, HeartbeatReward, SeniorityUpdate, ValidatedHeartbeat},
heartbeats::{Heartbeat, HeartbeatReward, LocationCache, SeniorityUpdate, ValidatedHeartbeat},
radio_threshold::VerifiedRadioThresholds,
reward_shares::CoveragePoints,
speedtests::Speedtest,
Expand Down Expand Up @@ -371,12 +371,14 @@ async fn test_footfall_and_urbanization(pool: PgPool) -> anyhow::Result<()> {

let coverage_objects = CoverageObjectCache::new(&pool);
let coverage_claim_time_cache = CoverageClaimTimeCache::new();
let location_cache = LocationCache::new(&pool);

let epoch = start..end;
let mut heartbeats = pin!(ValidatedHeartbeat::validate_heartbeats(
&AllOwnersValid,
stream::iter(heartbeats.map(Heartbeat::from)),
&AllOwnersValid,
&coverage_objects,
&location_cache,
2000,
2000,
&epoch,
Expand Down
Loading

0 comments on commit fab10f6

Please sign in to comment.