Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Last Location Cache to be valid for up to 12 hours from latest heartbeat with valid location #822

Merged
merged 3 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ pub fn new_coverage_object_notification_channel(
)
}

#[derive(Clone)]
pub struct CoverageObject {
pub coverage_object: file_store::coverage::CoverageObject,
pub validity: CoverageObjectValidity,
Expand Down Expand Up @@ -298,7 +299,7 @@ impl CoverageObject {
Ok(())
}

pub async fn save(self, transaction: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
pub async fn save(&self, transaction: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
let insertion_time = Utc::now();
let key = self.key();
let hb_type = key.hb_type();
Expand Down
124 changes: 124 additions & 0 deletions mobile_verifier/src/heartbeats/last_location.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::sync::Arc;

use chrono::{DateTime, Duration, Utc};
use helium_crypto::PublicKeyBinary;
use retainer::Cache;
use sqlx::PgPool;

#[derive(sqlx::FromRow, Copy, Clone)]
pub struct LastLocation {
pub location_validation_timestamp: DateTime<Utc>,
pub latest_timestamp: DateTime<Utc>,
pub lat: f64,
pub lon: f64,
}

impl LastLocation {
pub fn new(
location_validation_timestamp: DateTime<Utc>,
latest_timestamp: DateTime<Utc>,
lat: f64,
lon: f64,
) -> Self {
Self {
location_validation_timestamp,
latest_timestamp,
lat,
lon,
}
}

/// Calculates the duration from now in which last_valid_timestamp is 12 hours old
pub fn duration_to_expiration(&self) -> Duration {
((self.latest_timestamp + Duration::hours(12)) - Utc::now()).max(Duration::zero())
}
}

/// A cache for previous valid (or invalid) WiFi heartbeat locations
#[derive(Clone)]
pub struct LocationCache {
pool: PgPool,
locations: Arc<Cache<PublicKeyBinary, Option<LastLocation>>>,
}

impl LocationCache {
pub fn new(pool: &PgPool) -> Self {
let locations = Arc::new(Cache::new());
let locations_clone = locations.clone();
tokio::spawn(async move {
locations_clone
.monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 24))
.await
});
Self {
pool: pool.clone(),
locations,
}
}

async fn fetch_from_db_and_set(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
let last_location: Option<LastLocation> = sqlx::query_as(
r#"
SELECT location_validation_timestamp, latest_timestamp, lat, lon
FROM wifi_heartbeats
WHERE location_validation_timestamp IS NOT NULL
AND latest_timestamp >= $1
AND hotspot_key = $2
ORDER BY latest_timestamp DESC
LIMIT 1
"#,
)
.bind(Utc::now() - Duration::hours(12))
.bind(hotspot)
.fetch_optional(&self.pool)
.await?;
self.locations
.insert(
hotspot.clone(),
last_location,
last_location
.map(|x| x.duration_to_expiration())
.unwrap_or_else(|| Duration::days(365))
.to_std()?,
)
.await;
Ok(last_location)
}

pub async fn fetch_last_location(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
Ok(
if let Some(last_location) = self.locations.get(hotspot).await {
*last_location
} else {
self.fetch_from_db_and_set(hotspot).await?
},
)
}

pub async fn set_last_location(
&self,
hotspot: &PublicKeyBinary,
last_location: LastLocation,
) -> anyhow::Result<()> {
let duration_to_expiration = last_location.duration_to_expiration();
self.locations
.insert(
hotspot.clone(),
Some(last_location),
duration_to_expiration.to_std()?,
)
.await;
Ok(())
}

/// Only used for testing.
pub async fn delete_last_location(&self, hotspot: &PublicKeyBinary) {
self.locations.remove(hotspot).await;
}
}
120 changes: 6 additions & 114 deletions mobile_verifier/src/heartbeats/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod cbrs;
pub mod last_location;
pub mod wifi;

use crate::{
Expand All @@ -20,10 +21,12 @@ use helium_proto::services::poc_mobile as proto;
use retainer::Cache;
use rust_decimal::{prelude::ToPrimitive, Decimal};
use rust_decimal_macros::dec;
use sqlx::{postgres::PgTypeInfo, Decode, Encode, PgPool, Postgres, Transaction, Type};
use std::{ops::Range, pin::pin, sync::Arc, time};
use sqlx::{postgres::PgTypeInfo, Decode, Encode, Postgres, Transaction, Type};
use std::{ops::Range, pin::pin, time};
use uuid::Uuid;

use self::last_location::{LastLocation, LocationCache};

/// Minimum number of heartbeats required to give a reward to the hotspot.
const MINIMUM_HEARTBEAT_COUNT: i64 = 12;

Expand Down Expand Up @@ -541,6 +544,7 @@ impl ValidatedHeartbeat {
&heartbeat.hotspot_key,
LastLocation::new(
location_validation_timestamp,
heartbeat.timestamp,
heartbeat.lat,
heartbeat.lon,
),
Expand Down Expand Up @@ -780,118 +784,6 @@ pub async fn clear_heartbeats(
Ok(())
}

/// A cache for previous valid (or invalid) WiFi heartbeat locations
#[derive(Clone)]
pub struct LocationCache {
pool: PgPool,
locations: Arc<Cache<PublicKeyBinary, Option<LastLocation>>>,
}

impl LocationCache {
pub fn new(pool: &PgPool) -> Self {
let locations = Arc::new(Cache::new());
let locations_clone = locations.clone();
tokio::spawn(async move {
locations_clone
.monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 24))
.await
});
Self {
pool: pool.clone(),
locations,
}
}

async fn fetch_from_db_and_set(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
let last_location: Option<LastLocation> = sqlx::query_as(
r#"
SELECT location_validation_timestamp, lat, lon
FROM wifi_heartbeats
WHERE location_validation_timestamp IS NOT NULL
AND location_validation_timestamp >= $1
AND hotspot_key = $2
ORDER BY location_validation_timestamp DESC
LIMIT 1
"#,
)
.bind(Utc::now() - Duration::hours(12))
.bind(hotspot)
.fetch_optional(&self.pool)
.await?;
self.locations
.insert(
hotspot.clone(),
last_location,
last_location
.map(|x| x.duration_to_expiration())
.unwrap_or_else(|| Duration::days(365))
.to_std()?,
)
.await;
Ok(last_location)
}

pub async fn fetch_last_location(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
Ok(
if let Some(last_location) = self.locations.get(hotspot).await {
*last_location
} else {
self.fetch_from_db_and_set(hotspot).await?
},
)
}

pub async fn set_last_location(
&self,
hotspot: &PublicKeyBinary,
last_location: LastLocation,
) -> anyhow::Result<()> {
let duration_to_expiration = last_location.duration_to_expiration();
self.locations
.insert(
hotspot.clone(),
Some(last_location),
duration_to_expiration.to_std()?,
)
.await;
Ok(())
}

/// Only used for testing.
pub async fn delete_last_location(&self, hotspot: &PublicKeyBinary) {
self.locations.remove(hotspot).await;
}
}

#[derive(sqlx::FromRow, Copy, Clone)]
pub struct LastLocation {
pub location_validation_timestamp: DateTime<Utc>,
pub lat: f64,
pub lon: f64,
}

impl LastLocation {
fn new(location_validation_timestamp: DateTime<Utc>, lat: f64, lon: f64) -> Self {
Self {
location_validation_timestamp,
lat,
lon,
}
}

/// Calculates the duration from now in which last_valid_timestamp is 12 hours old
fn duration_to_expiration(&self) -> Duration {
((self.location_validation_timestamp + Duration::hours(12)) - Utc::now())
.max(Duration::zero())
}
}

pub struct SeniorityUpdate<'a> {
heartbeat: &'a ValidatedHeartbeat,
action: SeniorityUpdateAction,
Expand Down
5 changes: 4 additions & 1 deletion mobile_verifier/tests/integrations/boosting_oracles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use mobile_config::boosted_hex_info::BoostedHexes;
use mobile_verifier::{
coverage::{CoverageClaimTimeCache, CoverageObject, CoverageObjectCache, Seniority},
geofence::GeofenceValidator,
heartbeats::{Heartbeat, HeartbeatReward, LocationCache, SeniorityUpdate, ValidatedHeartbeat},
heartbeats::{
last_location::LocationCache, Heartbeat, HeartbeatReward, SeniorityUpdate,
ValidatedHeartbeat,
},
radio_threshold::VerifiedRadioThresholds,
reward_shares::CoveragePoints,
speedtests::Speedtest,
Expand Down
Loading