diff --git a/Cargo.lock b/Cargo.lock index 1266a5a66..b41455f07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2505,6 +2505,7 @@ dependencies = [ "derive_builder", "futures", "futures-util", + "h3o", "helium-crypto", "helium-proto", "hex-literal", @@ -2530,6 +2531,7 @@ dependencies = [ "tokio-util", "tracing", "triggered", + "uuid", ] [[package]] @@ -2850,6 +2852,8 @@ dependencies = [ "geo", "geojson", "konst", + "serde", + "serde_repr", ] [[package]] @@ -4059,6 +4063,7 @@ dependencies = [ "file-store", "futures", "futures-util", + "h3o", "helium-crypto", "helium-proto", "http-serde", @@ -4086,6 +4091,7 @@ dependencies = [ "tracing", "tracing-subscriber", "triggered", + "uuid", ] [[package]] @@ -5690,6 +5696,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_repr" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395627de918015623b32e7669714206363a7fc00382bf477e72c1f7533e8eafc" +dependencies = [ + "proc-macro2 1.0.47", + "quote 1.0.21", + "syn 1.0.104", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -7584,9 +7601,13 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "uuid" -version = "1.2.2" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" +checksum = "0fa2982af2eec27de306107c027578ff7f423d65f7250e40ce0fea8f45248b81" +dependencies = [ + "getrandom 0.2.8", + "serde", +] [[package]] name = "vcell" diff --git a/Cargo.toml b/Cargo.toml index fd747c1d0..66919b2aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,8 +85,8 @@ prost = "*" once_cell = "1" lazy_static = "1" config = {version="0", default-features=false, features=["toml"]} -h3o = "0" -xorf = {version = "0", features = ["serde"] } +h3o = {version = "0", features = ["serde"]} +xorf = {version = "0", features = ["serde"]} bytes = "*" structopt = "0" bincode = "1" @@ -99,6 +99,7 @@ itertools = "*" data-credits = {git = "https://github.com/helium/helium-program-library.git", tag = "v0.1.0"} helium-sub-daos = {git = "https://github.com/helium/helium-program-library.git", tag = "v0.1.0"} price-oracle = {git = "https://github.com/helium/helium-program-library.git", tag = "v0.1.0"} +uuid = {version = "1.3.4", features = ["v4", "serde"]} [patch.crates-io] sqlx = { git = "https://github.com/helium/sqlx.git", rev = "92a2268f02e0cac6fccb34d3e926347071dbb88d" } diff --git a/file_store/Cargo.toml b/file_store/Cargo.toml index 5d0d6d1ae..86250a751 100644 --- a/file_store/Cargo.toml +++ b/file_store/Cargo.toml @@ -46,6 +46,8 @@ sqlx = {workspace = true} async-trait = {workspace = true} derive_builder = "0" retainer = {workspace = true} +uuid = {workspace = true} +h3o = {workspace = true} [dev-dependencies] hex-literal = "0" diff --git a/file_store/src/coverage.rs b/file_store/src/coverage.rs new file mode 100644 index 000000000..564aa7706 --- /dev/null +++ b/file_store/src/coverage.rs @@ -0,0 +1,104 @@ +use crate::{ + error::DecodeError, + traits::{MsgDecode, TimestampDecode}, + Error, Result, +}; +use chrono::{DateTime, Utc}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::{ + CoverageObjectIngestReportV1, CoverageObjectReqV1, + RadioHexSignalLevel as RadioHexSignalLevelProto, SignalLevel, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct RadioHexSignalLevel { + pub location: h3o::CellIndex, + pub signal_level: SignalLevel, + pub signal_power: i32, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CoverageObject { + pub pub_key: PublicKeyBinary, + pub uuid: Uuid, + pub cbsd_id: String, + pub coverage_claim_time: DateTime, + pub indoor: bool, + pub coverage: Vec, + pub signature: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CoverageObjectIngestReport { + pub received_timestamp: DateTime, + pub report: CoverageObject, +} + +impl MsgDecode for CoverageObject { + type Msg = CoverageObjectReqV1; +} + +impl MsgDecode for CoverageObjectIngestReport { + type Msg = CoverageObjectIngestReportV1; +} + +impl TryFrom for CoverageObjectIngestReport { + type Error = Error; + + fn try_from(v: CoverageObjectIngestReportV1) -> Result { + Ok(Self { + received_timestamp: v.received_timestamp.to_timestamp_millis()?, + report: v + .report + .ok_or_else(|| Error::not_found("ingest coverage object report"))? + .try_into()?, + }) + } +} + +impl TryFrom for CoverageObject { + type Error = Error; + + fn try_from(v: CoverageObjectReqV1) -> Result { + let coverage: Result> = v + .coverage + .into_iter() + .map(RadioHexSignalLevel::try_from) + .collect(); + Ok(Self { + pub_key: v.pub_key.into(), + uuid: Uuid::from_slice(&v.uuid).map_err(DecodeError::from)?, + cbsd_id: v.cbsd_id, + coverage_claim_time: v.coverage_claim_time.to_timestamp()?, + indoor: v.indoor, + coverage: coverage?, + signature: v.signature, + }) + } +} + +impl TryFrom for RadioHexSignalLevel { + type Error = Error; + + fn try_from(v: RadioHexSignalLevelProto) -> Result { + Ok(Self { + signal_level: SignalLevel::from_i32(v.signal_level).ok_or_else(|| { + DecodeError::unsupported_signal_level("coverage_object_req_v1", v.signal_level) + })?, + signal_power: v.signal_power, + location: v.location.parse().map_err(DecodeError::from)?, + }) + } +} + +impl From for RadioHexSignalLevelProto { + fn from(rhsl: RadioHexSignalLevel) -> RadioHexSignalLevelProto { + RadioHexSignalLevelProto { + signal_level: rhsl.signal_level as i32, + signal_power: rhsl.signal_power, + location: rhsl.location.to_string(), + } + } +} diff --git a/file_store/src/error.rs b/file_store/src/error.rs index 9aae093be..bc6c022d3 100644 --- a/file_store/src/error.rs +++ b/file_store/src/error.rs @@ -54,8 +54,14 @@ pub enum DecodeError { UnsupportedParticipantSide(String, i32), #[error("unsupported verification status, type: {0}, value: {1}")] UnsupportedStatusReason(String, i32), + #[error("unsupported signal level, type: {0}, value: {1}")] + UnsupportedSignalLevel(String, i32), #[error("invalid unix timestamp {0}")] InvalidTimestamp(u64), + #[error("Uuid error: {0}")] + UuidError(#[from] uuid::Error), + #[error("Invalid cell index error: {0}")] + InvalidCellIndexError(#[from] h3o::error::InvalidCellIndex), } #[derive(Error, Debug)] @@ -127,6 +133,10 @@ impl DecodeError { pub fn unsupported_status_reason(msg1: E, msg2: i32) -> Error { Error::Decode(Self::UnsupportedInvalidReason(msg1.to_string(), msg2)) } + + pub fn unsupported_signal_level(msg1: impl ToString, msg2: i32) -> Error { + Error::Decode(Self::UnsupportedSignalLevel(msg1.to_string(), msg2)) + } } impl From for Error { diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 20d0c94e5..6b7ccf61f 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -138,7 +138,9 @@ pub const VALID_DATA_TRANSFER_SESSION: &str = "valid_data_transfer_session"; pub const PRICE_REPORT: &str = "price_report"; pub const MOBILE_REWARD_SHARE: &str = "mobile_reward_share"; pub const MAPPER_MSG: &str = "mapper_msg"; +pub const COVERAGE_OBJECT: &str = "coverage_object"; pub const COVERAGE_OBJECT_INGEST_REPORT: &str = "coverage_object_ingest_report"; +pub const SENIORITY_UPDATE: &str = "seniority_update"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -174,7 +176,9 @@ pub enum FileType { SubscriberLocationIngestReport, VerifiedSubscriberLocationIngestReport, MapperMsg, + CoverageObject, CoverageObjectIngestReport, + SeniorityUpdate, } impl fmt::Display for FileType { @@ -215,7 +219,9 @@ impl fmt::Display for FileType { Self::PriceReport => PRICE_REPORT, Self::MobileRewardShare => MOBILE_REWARD_SHARE, Self::MapperMsg => MAPPER_MSG, + Self::CoverageObject => COVERAGE_OBJECT, Self::CoverageObjectIngestReport => COVERAGE_OBJECT_INGEST_REPORT, + Self::SeniorityUpdate => SENIORITY_UPDATE, }; f.write_str(s) } @@ -259,7 +265,9 @@ impl FileType { Self::PriceReport => PRICE_REPORT, Self::MobileRewardShare => MOBILE_REWARD_SHARE, Self::MapperMsg => MAPPER_MSG, + Self::CoverageObject => COVERAGE_OBJECT, Self::CoverageObjectIngestReport => COVERAGE_OBJECT_INGEST_REPORT, + Self::SeniorityUpdate => SENIORITY_UPDATE, } } } @@ -303,7 +311,9 @@ impl FromStr for FileType { PRICE_REPORT => Self::PriceReport, MOBILE_REWARD_SHARE => Self::MobileRewardShare, MAPPER_MSG => Self::MapperMsg, + COVERAGE_OBJECT => Self::CoverageObject, COVERAGE_OBJECT_INGEST_REPORT => Self::CoverageObjectIngestReport, + SENIORITY_UPDATE => Self::SeniorityUpdate, _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/heartbeat.rs b/file_store/src/heartbeat.rs index 422fc2564..19d6f8238 100644 --- a/file_store/src/heartbeat.rs +++ b/file_store/src/heartbeat.rs @@ -6,6 +6,7 @@ use chrono::{DateTime, Utc}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{CellHeartbeatIngestReportV1, CellHeartbeatReqV1}; use serde::{Deserialize, Serialize}; +use uuid::Uuid; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct CellHeartbeat { @@ -18,6 +19,13 @@ pub struct CellHeartbeat { pub operation_mode: bool, pub cbsd_category: String, pub cbsd_id: String, + pub coverage_object: Vec, +} + +impl CellHeartbeat { + pub fn coverage_object(&self) -> Option { + Uuid::from_slice(&self.coverage_object).ok() + } } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -47,6 +55,7 @@ impl TryFrom for CellHeartbeat { operation_mode: v.operation_mode, cbsd_category: v.cbsd_category, cbsd_id: v.cbsd_id, + coverage_object: v.coverage_object, }) } } diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index 338ef9f68..4cc707ad1 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -1,4 +1,5 @@ pub mod cli; +pub mod coverage; pub mod entropy_report; mod error; mod file_info; diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index a7fa49bee..9b760c372 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -43,4 +43,6 @@ reward-scheduler = {path = "../reward_scheduler"} price = {path = "../price"} rand = {workspace = true} async-trait = {workspace = true} -retainer = {workspace = true} \ No newline at end of file +retainer = {workspace = true} +uuid = {workspace = true} +h3o = {workspace = true} \ No newline at end of file diff --git a/mobile_verifier/migrations/15_modeled_coverage.sql b/mobile_verifier/migrations/15_modeled_coverage.sql new file mode 100644 index 000000000..457632b0d --- /dev/null +++ b/mobile_verifier/migrations/15_modeled_coverage.sql @@ -0,0 +1,29 @@ +CREATE TYPE signal_level AS ENUM ( + 'none', + 'low', + 'medium', + 'high' +); + +CREATE TABLE hex_coverage ( + uuid UUID NOT NULL, + hex BIGINT NOT NULL, + indoor BOOLEAN NOT NULL, + cbsd_id TEXT NOT NULL, + signal_level signal_level NOT NULL, + coverage_claim_time TIMESTAMPTZ NOT NULL, + inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (uuid, hex) +); + +CREATE TABLE seniority ( + cbsd_id TEXT NOT NULL, + seniority_ts TIMESTAMPTZ NOT NULL, + inserted_at TIMESTAMPTZ NOT NULL, + last_heartbeat TIMESTAMPTZ NOT NULL, + uuid UUID NOT NULL, + PRIMARY KEY (cbsd_id, seniority_ts) +); + +-- Coverage object can be NULL +ALTER TABLE heartbeats ADD COLUMN coverage_object UUID; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 51590b75d..006e9067c 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -1,15 +1,15 @@ use crate::{ - data_session::DataSessionIngestor, heartbeats::HeartbeatDaemon, rewarder::Rewarder, - speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, telemetry, - Settings, + coverage::CoverageDaemon, data_session::DataSessionIngestor, heartbeats::HeartbeatDaemon, + rewarder::Rewarder, speedtests::SpeedtestDaemon, + subscriber_location::SubscriberLocationIngestor, telemetry, Settings, }; use anyhow::{Error, Result}; use chrono::Duration; use file_store::{ - file_info_poller::LookbackBehavior, file_sink, file_source, file_upload, - heartbeat::CellHeartbeatIngestReport, mobile_subscriber::SubscriberLocationIngestReport, - mobile_transfer::ValidDataTransferSession, speedtest::CellSpeedtestIngestReport, FileStore, - FileType, + coverage::CoverageObjectIngestReport, file_info_poller::LookbackBehavior, file_sink, + file_source, file_upload, heartbeat::CellHeartbeatIngestReport, + mobile_subscriber::SubscriberLocationIngestReport, mobile_transfer::ValidDataTransferSession, + speedtest::CellSpeedtestIngestReport, FileStore, FileType, }; use futures_util::TryFutureExt; @@ -82,11 +82,26 @@ impl Cmd { .create() .await?; + // Seniority updates + let (seniority_updates, mut seniority_updates_server) = file_sink::FileSinkBuilder::new( + FileType::SeniorityUpdate, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_seniority_update"), + shutdown_listener.clone(), + ) + .deposits(Some(file_upload_tx.clone())) + .auto_commit(false) + .roll_time(Duration::minutes(15)) + .create() + .await?; + let heartbeat_daemon = HeartbeatDaemon::new( pool.clone(), gateway_client.clone(), heartbeats, valid_heartbeats, + seniority_updates, + settings.max_heartbeat_distance_from_coverage_km, ); // Speedtests @@ -119,6 +134,37 @@ impl Cmd { valid_speedtests, ); + // Coverage objects + let (coverage_objs, coverage_objs_join_handle) = + file_source::continuous_source::() + .db(pool.clone()) + .store(report_ingest.clone()) + .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .file_type(FileType::CoverageObjectIngestReport) + .build()? + .start(shutdown_listener.clone()) + .await?; + + let (valid_coverage_objs, mut valid_coverage_objs_server) = + file_sink::FileSinkBuilder::new( + FileType::CoverageObject, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_coverage_object"), + shutdown_listener.clone(), + ) + .deposits(Some(file_upload_tx.clone())) + .auto_commit(false) + .roll_time(Duration::minutes(15)) + .create() + .await?; + + let coverage_daemon = CoverageDaemon::new( + pool.clone(), + auth_client.clone(), + coverage_objs, + valid_coverage_objs, + ); + // Mobile rewards let reward_period_hours = settings.rewards; let (mobile_rewards, mut mobile_rewards_server) = file_sink::FileSinkBuilder::new( @@ -200,6 +246,8 @@ impl Cmd { db_join_handle.map_err(Error::from), valid_heartbeats_server.run().map_err(Error::from), valid_speedtests_server.run().map_err(Error::from), + valid_coverage_objs_server.run().map_err(Error::from), + seniority_updates_server.run().map_err(Error::from), mobile_rewards_server.run().map_err(Error::from), file_upload.run(&shutdown_listener).map_err(Error::from), reward_manifests_server.run().map_err(Error::from), @@ -215,8 +263,10 @@ impl Cmd { tracker_process.map_err(Error::from), heartbeats_join_handle.map_err(Error::from), speedtests_join_handle.map_err(Error::from), + coverage_objs_join_handle.map_err(Error::from), heartbeat_daemon.run(shutdown_listener.clone()), speedtest_daemon.run(shutdown_listener.clone()), + coverage_daemon.run(shutdown_listener.clone()), rewarder.run(shutdown_listener.clone()), subscriber_location_ingest_join_handle.map_err(anyhow::Error::from), data_session_ingest_join_handle.map_err(anyhow::Error::from), diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs new file mode 100644 index 000000000..d606d0fb1 --- /dev/null +++ b/mobile_verifier/src/coverage.rs @@ -0,0 +1,720 @@ +use std::{ + cmp::Ordering, + collections::{BTreeMap, BinaryHeap, HashMap}, + pin::pin, + sync::Arc, +}; + +use chrono::{DateTime, Utc}; +use file_store::{ + coverage::CoverageObjectIngestReport, file_info_poller::FileInfoStream, + file_sink::FileSinkClient, traits::TimestampEncode, +}; +use futures::{ + stream::{BoxStream, Stream, StreamExt}, + TryFutureExt, TryStreamExt, +}; +use h3o::{CellIndex, LatLng}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::{ + mobile_config::NetworkKeyRole, + poc_mobile::{self as proto, CoverageObjectValidity, SignalLevel as SignalLevelProto}, +}; +use mobile_config::client::AuthorizationClient; +use retainer::Cache; +use rust_decimal::Decimal; +use rust_decimal_macros::dec; +use sqlx::{FromRow, Pool, Postgres, Transaction, Type}; +use tokio::sync::mpsc::Receiver; +use uuid::Uuid; + +#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Type)] +#[sqlx(type_name = "signal_level")] +#[sqlx(rename_all = "lowercase")] +pub enum SignalLevel { + None, + Low, + Medium, + High, +} + +impl From for SignalLevel { + fn from(level: SignalLevelProto) -> Self { + match level { + SignalLevelProto::High => Self::High, + SignalLevelProto::Medium => Self::Medium, + SignalLevelProto::Low => Self::Low, + SignalLevelProto::None => Self::None, + } + } +} + +pub struct CoverageDaemon { + pool: Pool, + auth_client: AuthorizationClient, + coverage_objs: Receiver>, + file_sink: FileSinkClient, +} + +impl CoverageDaemon { + pub fn new( + pool: Pool, + auth_client: AuthorizationClient, + coverage_objs: Receiver>, + file_sink: FileSinkClient, + ) -> Self { + Self { + pool, + auth_client, + coverage_objs, + file_sink, + } + } + + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tokio::spawn(async move { + loop { + tokio::select! { + _ = shutdown.clone() => { + tracing::info!("CoverageDaemon shutting down"); + break; + } + Some(file) = self.coverage_objs.recv() => self.process_file(file).await?, + } + } + + Ok(()) + }) + .map_err(anyhow::Error::from) + .and_then(|result| async move { result }) + .await + } + + async fn process_file( + &mut self, + file: FileInfoStream, + ) -> anyhow::Result<()> { + tracing::info!("Processing coverage object file {}", file.file_info.key); + + let mut transaction = self.pool.begin().await?; + let reports = file.into_stream(&mut transaction).await?; + + let mut validated_coverage_objects = pin!(CoverageObject::validate_coverage_objects( + &self.auth_client, + reports + )); + + while let Some(coverage_object) = validated_coverage_objects.next().await.transpose()? { + coverage_object.write(&self.file_sink).await?; + coverage_object.save(&mut transaction).await?; + } + + self.file_sink.commit().await?; + transaction.commit().await?; + + Ok(()) + } +} + +pub struct CoverageObject { + coverage_obj: file_store::coverage::CoverageObject, + validity: CoverageObjectValidity, +} + +impl CoverageObject { + pub fn validate_coverage_objects<'a>( + auth_client: &'a AuthorizationClient, + coverage_objects: impl Stream + 'a, + ) -> impl Stream> + 'a { + coverage_objects.then(move |coverage_object_report| async move { + let validity = validate_coverage_object(&coverage_object_report, auth_client).await?; + Ok(CoverageObject { + coverage_obj: coverage_object_report.report, + validity, + }) + }) + } + + pub async fn write(&self, coverage_objects: &FileSinkClient) -> file_store::Result { + coverage_objects + .write( + proto::CoverageObjectV1 { + coverage_object: Some(proto::CoverageObjectReqV1 { + pub_key: self.coverage_obj.pub_key.clone().into(), + uuid: Vec::from(self.coverage_obj.uuid.into_bytes()), + cbsd_id: self.coverage_obj.cbsd_id.clone(), + coverage_claim_time: self + .coverage_obj + .coverage_claim_time + .encode_timestamp(), + indoor: self.coverage_obj.indoor, + coverage: self + .coverage_obj + .coverage + .clone() + .into_iter() + .map(Into::into) + .collect(), + signature: self.coverage_obj.signature.clone(), + }), + validity: self.validity as i32, + }, + &[("validity", self.validity.as_str_name())], + ) + .await?; + Ok(()) + } + + pub async fn save(self, transaction: &mut Transaction<'_, Postgres>) -> anyhow::Result { + // If the coverage object is not valid, do not save it + if self.validity != CoverageObjectValidity::Valid { + return Ok(false); + } + + for hex in self.coverage_obj.coverage { + let location: u64 = hex.location.into(); + sqlx::query( + r#" + INSERT INTO hex_coverage + (uuid, hex, indoor, cbsd_id, signal_level, coverage_claim_time) + VALUES + ($1, $2, $3, $4, $5, $6) + "#, + ) + .bind(self.coverage_obj.uuid) + .bind(location as i64) + .bind(self.coverage_obj.indoor) + .bind(&self.coverage_obj.cbsd_id) + .bind(SignalLevel::from(hex.signal_level)) + .bind(self.coverage_obj.coverage_claim_time) + .execute(&mut *transaction) + .await?; + } + + Ok(true) + } +} + +async fn validate_coverage_object( + coverage_object: &CoverageObjectIngestReport, + auth_client: &AuthorizationClient, +) -> anyhow::Result { + if !auth_client + .verify_authorized_key(&coverage_object.report.pub_key, NetworkKeyRole::MobilePcs) + .await? + { + return Ok(CoverageObjectValidity::InvalidPubKey); + } + + Ok(CoverageObjectValidity::Valid) +} + +#[derive(Clone, FromRow)] +pub struct HexCoverage { + pub uuid: Uuid, + pub hex: i64, + pub indoor: bool, + pub cbsd_id: String, + pub signal_level: SignalLevel, + pub coverage_claim_time: DateTime, +} + +#[derive(Eq)] +#[allow(dead_code)] +struct CoverageLevel { + cbsd_id: String, + coverage_claim_time: DateTime, + hotspot: PublicKeyBinary, + indoor: bool, + signal_level: SignalLevel, +} + +impl PartialEq for CoverageLevel { + fn eq(&self, other: &Self) -> bool { + self.coverage_claim_time == other.coverage_claim_time + } +} + +impl PartialOrd for CoverageLevel { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.coverage_claim_time.cmp(&other.coverage_claim_time)) + } +} + +impl Ord for CoverageLevel { + fn cmp(&self, other: &Self) -> Ordering { + self.coverage_claim_time.cmp(&other.coverage_claim_time) + } +} + +impl CoverageLevel { + #[allow(dead_code)] + fn coverage_points(&self) -> anyhow::Result { + Ok(match (self.indoor, self.signal_level) { + (true, SignalLevel::High) => dec!(400), + (true, SignalLevel::Low) => dec!(100), + (false, SignalLevel::High) => dec!(16), + (false, SignalLevel::Medium) => dec!(8), + (false, SignalLevel::Low) => dec!(4), + (_, SignalLevel::None) => dec!(0), + _ => anyhow::bail!("Indoor radio cannot have a signal level of medium"), + }) + } +} + +#[derive(PartialEq, Debug)] +pub struct CoverageReward { + pub cbsd_id: String, + pub points: Decimal, + pub hotspot: PublicKeyBinary, +} + +#[allow(dead_code)] +pub const MAX_RADIOS_PER_HEX: usize = 5; + +#[async_trait::async_trait] +pub trait CoveredHexStream { + async fn covered_hex_stream<'a>( + &'a self, + cbsd_id: &'a str, + coverage_obj: &'a Uuid, + period_end: DateTime, + ) -> Result>, sqlx::Error>; +} + +#[derive(sqlx::FromRow)] +pub struct Seniority { + pub uuid: Uuid, + pub seniority_ts: DateTime, + pub last_heartbeat: DateTime, + pub inserted_at: DateTime, +} + +#[async_trait::async_trait] +impl CoveredHexStream for Pool { + async fn covered_hex_stream<'a>( + &'a self, + cbsd_id: &'a str, + coverage_obj: &'a Uuid, + period_end: DateTime, + ) -> Result>, sqlx::Error> { + // Adjust the coverage + let seniority: Seniority = sqlx::query_as( + r#" + SELECT * FROM seniority + WHERE + cbsd_id = $1 AND + inserted_at <= $2 + ORDER BY inserted_at DESC + LIMIT 1 + "#, + ) + .bind(cbsd_id) + .bind(period_end) + .fetch_one(self) + .await?; + + // We can safely delete any seniority objects that appear before the latest in the reward period + sqlx::query("DELETE FROM seniority WHERE inserted_at < $1") + .bind(seniority.inserted_at) + .execute(self) + .await?; + + // Find the time of insertion for the currently in use coverage object + let current_inserted_at: DateTime = sqlx::query_scalar( + "SELECT inserted_at FROM hex_coverage WHERE cbsd_id = $1 AND uuid = $2 LIMIT 1", + ) + .bind(cbsd_id) + .bind(coverage_obj) + .fetch_one(self) + .await?; + + // Delete any hex coverages that were inserted before the one we are currently using, as they are + // no longer useful. + sqlx::query( + "DELETE FROM hex_coverage WHERE cbsd_id = $1 AND uuid != $2 AND inserted_at < $3", + ) + .bind(cbsd_id) + .bind(coverage_obj) + .bind(current_inserted_at) + .execute(self) + .await?; + + Ok( + sqlx::query_as("SELECT * FROM hex_coverage WHERE cbsd_id = $1 AND uuid = $2") + .bind(cbsd_id) + .bind(coverage_obj) + .fetch(self) + .map_ok(move |hc| HexCoverage { + coverage_claim_time: seniority.seniority_ts, + ..hc + }) + .boxed(), + ) + } +} + +#[derive(Default)] +#[allow(dead_code)] +pub struct CoveredHexes { + hexes: HashMap>; 2]>, +} + +impl CoveredHexes { + #[allow(dead_code)] + pub async fn aggregate_coverage( + &mut self, + hotspot: &PublicKeyBinary, + covered_hexes: impl Stream>, + ) -> Result<(), E> { + let mut covered_hexes = std::pin::pin!(covered_hexes); + + while let Some(HexCoverage { + hex, + indoor, + signal_level, + coverage_claim_time, + cbsd_id, + .. + }) = covered_hexes.next().await.transpose()? + { + self.hexes + .entry(CellIndex::try_from(hex as u64).unwrap()) + .or_default()[indoor as usize] + .entry(signal_level) + .or_default() + .push(CoverageLevel { + cbsd_id, + coverage_claim_time, + indoor, + signal_level, + hotspot: hotspot.clone(), + }); + } + + Ok(()) + } + + /// Returns the radios that should be rewarded for giving coverage. + #[allow(dead_code)] + pub fn into_iter(self) -> impl Iterator { + self.hexes + .into_values() + .flat_map(|radios| { + radios.into_iter().map(|mut radios| { + radios.pop_last().map(|(_, radios)| { + radios + .into_sorted_vec() + .into_iter() + .take(MAX_RADIOS_PER_HEX) + .map(|cl| CoverageReward { + points: cl.coverage_points().unwrap(), + hotspot: cl.hotspot, + cbsd_id: cl.cbsd_id, + }) + }) + }) + }) + .flatten() + .flatten() + .filter(|r| r.points > Decimal::ZERO) + } +} + +type CoverageClaimTimeKey = (String, Option); + +pub struct CoverageClaimTimeCache { + cache: Arc>>, +} + +impl CoverageClaimTimeCache { + pub fn new() -> Self { + let cache = Arc::new(Cache::new()); + let cache_clone = cache.clone(); + tokio::spawn(async move { + cache_clone + .monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 24 * 5)) + .await + }); + Self { cache } + } + + pub async fn fetch_coverage_claim_time( + &self, + cbsd_id: &str, + coverage_object: &Option, + exec: &mut Transaction<'_, Postgres>, + ) -> Result>, sqlx::Error> { + let key = (cbsd_id.to_string(), *coverage_object); + if let Some(coverage_claim_time) = self.cache.get(&key).await { + Ok(Some(*coverage_claim_time)) + } else { + let coverage_claim_time: Option> = sqlx::query_scalar( + r#" + SELECT coverage_claim_time FROM hex_coverage WHERE cbsd_id = $1 AND uuid = $2 LIMIT 1 + "#, + ) + .bind(cbsd_id) + .bind(coverage_object) + .fetch_optional(&mut *exec) + .await?; + if let Some(coverage_claim_time) = coverage_claim_time { + self.cache + .insert( + key, + coverage_claim_time, + std::time::Duration::from_secs(60 * 60 * 24), + ) + .await; + } + Ok(coverage_claim_time) + } + } +} + +#[allow(dead_code)] +pub struct CoveredHexCache { + pool: Pool, + covered_hexes: Arc>, +} + +#[allow(dead_code)] +impl CoveredHexCache { + pub fn new(pool: &Pool) -> Self { + let cache = Arc::new(Cache::new()); + let cache_clone = cache.clone(); + tokio::spawn(async move { + cache_clone + .monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 24 * 2)) + .await + }); + + Self { + covered_hexes: cache, + pool: pool.clone(), + } + } + + pub async fn fetch_coverage(&self, uuid: &Uuid) -> Result, sqlx::Error> { + if let Some(covered_hexes) = self.covered_hexes.get(uuid).await { + return Ok(Some(covered_hexes.clone())); + } + let Some(cbsd_id) = sqlx::query_scalar("SELECT cbsd_id FROM hex_coverage WHERE uuid = $1 LIMIT 1") + .bind(uuid) + .fetch_optional(&self.pool) + .await? + else { + return Ok(None); + }; + let coverage: Vec<_> = sqlx::query_as("SELECT * FROM hex_coverage WHERE uuid = $1") + .bind(uuid) + .fetch_all(&self.pool) + .await? + .into_iter() + .map(|HexCoverage { hex, .. }| CellIndex::try_from(hex as u64).unwrap()) + .collect(); + let cached_coverage = CachedCoverage { cbsd_id, coverage }; + let _ = self + .covered_hexes + .insert( + *uuid, + cached_coverage.clone(), + std::time::Duration::from_secs(60 * 60 * 24), + ) + .await; + Ok(Some(cached_coverage)) + } +} + +#[derive(Clone)] +#[allow(dead_code)] +pub struct CachedCoverage { + pub cbsd_id: String, + coverage: Vec, +} + +#[allow(dead_code)] +impl CachedCoverage { + pub fn max_distance_km(&self, latlng: LatLng) -> f64 { + self.coverage.iter().fold(0.0, |curr_max, curr_cov| { + let cov = LatLng::from(*curr_cov); + curr_max.max(cov.distance_km(latlng)) + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + use chrono::NaiveDate; + use futures::stream::iter; + + fn default_hex_coverage(cbsd_id: &str, signal_level: SignalLevel) -> HexCoverage { + HexCoverage { + uuid: Uuid::new_v4(), + hex: 0x8a1fb46622dffff_u64 as i64, + indoor: false, + cbsd_id: cbsd_id.to_string(), + signal_level, + coverage_claim_time: DateTime::::MIN_UTC, + } + } + + /// Test to ensure that if there are multiple radios with different signal levels + /// in a given hex, that the one with the highest signal level is chosen. + #[tokio::test] + async fn ensure_max_signal_level_selected() { + let owner: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed owner parse"); + let mut covered_hexes = CoveredHexes::default(); + covered_hexes + .aggregate_coverage( + &owner, + iter(vec![ + anyhow::Ok(default_hex_coverage("1", SignalLevel::None)), + anyhow::Ok(default_hex_coverage("2", SignalLevel::Low)), + anyhow::Ok(default_hex_coverage("3", SignalLevel::Medium)), + anyhow::Ok(default_hex_coverage("4", SignalLevel::High)), + anyhow::Ok(default_hex_coverage("5", SignalLevel::Medium)), + anyhow::Ok(default_hex_coverage("6", SignalLevel::Low)), + anyhow::Ok(default_hex_coverage("7", SignalLevel::None)), + ]), + ) + .await + .unwrap(); + let rewards: Vec<_> = covered_hexes.into_iter().collect(); + assert_eq!( + rewards, + vec![CoverageReward { + cbsd_id: "4".to_string(), + hotspot: owner, + points: dec!(16) + }] + ); + } + + fn date(year: i32, month: u32, day: u32) -> DateTime { + DateTime::::from_utc( + NaiveDate::from_ymd_opt(year, month, day) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(), + Utc, + ) + } + + fn hex_coverage_with_date( + cbsd_id: &str, + signal_level: SignalLevel, + coverage_claim_time: DateTime, + ) -> HexCoverage { + HexCoverage { + uuid: Uuid::new_v4(), + hex: 0x8a1fb46622dffff_u64 as i64, + indoor: false, + cbsd_id: cbsd_id.to_string(), + signal_level, + coverage_claim_time, + } + } + + /// Test to ensure that if there are more than five radios with the highest signal + /// level in a given hex, that the five oldest radios are chosen. + #[tokio::test] + async fn ensure_oldest_five_radios_selected() { + let owner: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed owner parse"); + let mut covered_hexes = CoveredHexes::default(); + covered_hexes + .aggregate_coverage( + &owner, + iter(vec![ + anyhow::Ok(hex_coverage_with_date( + "1", + SignalLevel::High, + date(1980, 1, 1), + )), + anyhow::Ok(hex_coverage_with_date( + "2", + SignalLevel::High, + date(1970, 1, 5), + )), + anyhow::Ok(hex_coverage_with_date( + "3", + SignalLevel::High, + date(1990, 2, 2), + )), + anyhow::Ok(hex_coverage_with_date( + "4", + SignalLevel::High, + date(1970, 1, 4), + )), + anyhow::Ok(hex_coverage_with_date( + "5", + SignalLevel::High, + date(1975, 3, 3), + )), + anyhow::Ok(hex_coverage_with_date( + "6", + SignalLevel::High, + date(1970, 1, 3), + )), + anyhow::Ok(hex_coverage_with_date( + "7", + SignalLevel::High, + date(1974, 2, 2), + )), + anyhow::Ok(hex_coverage_with_date( + "8", + SignalLevel::High, + date(1970, 1, 2), + )), + anyhow::Ok(hex_coverage_with_date( + "9", + SignalLevel::High, + date(1976, 5, 2), + )), + anyhow::Ok(hex_coverage_with_date( + "10", + SignalLevel::High, + date(1970, 1, 1), + )), + ]), + ) + .await + .unwrap(); + let rewards: Vec<_> = covered_hexes.into_iter().collect(); + assert_eq!( + rewards, + vec![ + CoverageReward { + cbsd_id: "10".to_string(), + hotspot: owner.clone(), + points: dec!(16) + }, + CoverageReward { + cbsd_id: "8".to_string(), + hotspot: owner.clone(), + points: dec!(16) + }, + CoverageReward { + cbsd_id: "6".to_string(), + hotspot: owner.clone(), + points: dec!(16) + }, + CoverageReward { + cbsd_id: "4".to_string(), + hotspot: owner.clone(), + points: dec!(16) + }, + CoverageReward { + cbsd_id: "2".to_string(), + hotspot: owner.clone(), + points: dec!(16) + } + ] + ); + } +} diff --git a/mobile_verifier/src/heartbeats.rs b/mobile_verifier/src/heartbeats.rs index 01e1fd383..fdc16b1ad 100644 --- a/mobile_verifier/src/heartbeats.rs +++ b/mobile_verifier/src/heartbeats.rs @@ -1,6 +1,9 @@ //! Heartbeat storage -use crate::cell_type::CellType; +use crate::{ + cell_type::CellType, + coverage::{CoverageClaimTimeCache, CoveredHexCache}, +}; use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc}; use file_store::{ file_info_poller::FileInfoStream, file_sink::FileSinkClient, @@ -10,14 +13,16 @@ use futures::{ stream::{Stream, StreamExt, TryStreamExt}, TryFutureExt, }; +// use h3o::LatLng; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile as proto; -use mobile_config::{client::ClientError, gateway_info::GatewayInfoResolver, GatewayClient}; +use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient}; use retainer::Cache; use rust_decimal::{prelude::ToPrimitive, Decimal}; use sqlx::{Postgres, Transaction}; use std::{ops::Range, pin::pin, sync::Arc, time}; use tokio::sync::mpsc::Receiver; +use uuid::Uuid; #[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::FromRow)] pub struct HeartbeatKey { @@ -26,12 +31,6 @@ pub struct HeartbeatKey { cell_type: CellType, } -pub struct HeartbeatReward { - pub hotspot_key: PublicKeyBinary, - pub cbsd_id: String, - pub reward_weight: Decimal, -} - impl From for HeartbeatReward { fn from(value: HeartbeatKey) -> Self { Self { @@ -46,7 +45,9 @@ pub struct HeartbeatDaemon { pool: sqlx::Pool, gateway_client: GatewayClient, heartbeats: Receiver>, - file_sink: FileSinkClient, + heartbeat_sink: FileSinkClient, + seniority_sink: FileSinkClient, + max_distance: f64, } impl HeartbeatDaemon { @@ -54,34 +55,49 @@ impl HeartbeatDaemon { pool: sqlx::Pool, gateway_client: GatewayClient, heartbeats: Receiver>, - file_sink: FileSinkClient, + heartbeat_sink: FileSinkClient, + seniority_sink: FileSinkClient, + max_distance: f64, ) -> Self { Self { pool, gateway_client, heartbeats, - file_sink, + heartbeat_sink, + seniority_sink, + max_distance, } } pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { tokio::spawn(async move { - let cache = Arc::new(Cache::<(String, DateTime), ()>::new()); + let heartbeat_cache = + Arc::new(Cache::<(String, DateTime, Option), ()>::new()); - let cache_clone = cache.clone(); + let heartbeat_cache_clone = heartbeat_cache.clone(); tokio::spawn(async move { - cache_clone + heartbeat_cache_clone .monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 3)) .await }); + let coverage_claim_time_cache = CoverageClaimTimeCache::new(); + let covered_hex_cache = CoveredHexCache::new(&self.pool); + loop { tokio::select! { _ = shutdown.clone() => { tracing::info!("HeartbeatDaemon shutting down"); break; } - Some(file) = self.heartbeats.recv() => self.process_file(file, &cache).await?, + Some(file) = self.heartbeats.recv() => { + self.process_file( + file, + &heartbeat_cache, + &coverage_claim_time_cache, + &covered_hex_cache + ).await?; + } } } @@ -95,7 +111,9 @@ impl HeartbeatDaemon { async fn process_file( &self, file: FileInfoStream, - cache: &Cache<(String, DateTime), ()>, + heartbeat_cache: &Cache<(String, DateTime, Option), ()>, + coverage_claim_time_cache: &CoverageClaimTimeCache, + covered_hex_cache: &CoveredHexCache, ) -> anyhow::Result<()> { tracing::info!("Processing heartbeat file {}", file.file_info.key); @@ -104,28 +122,63 @@ impl HeartbeatDaemon { let mut transaction = self.pool.begin().await?; let reports = file.into_stream(&mut transaction).await?; - let mut validated_heartbeats = - pin!(Heartbeat::validate_heartbeats(&self.gateway_client, reports, &epoch).await); + let mut validated_heartbeats = pin!(Heartbeat::validate_heartbeats( + &self.gateway_client, + covered_hex_cache, + reports, + &epoch, + self.max_distance, + )); while let Some(heartbeat) = validated_heartbeats.next().await.transpose()? { - heartbeat.write(&self.file_sink).await?; - let key = (heartbeat.cbsd_id.clone(), heartbeat.truncated_timestamp()?); + if heartbeat.is_valid() && heartbeat.coverage_object.is_some() { + if let Some(coverage_claim_time) = coverage_claim_time_cache + .fetch_coverage_claim_time( + &heartbeat.cbsd_id, + &heartbeat.coverage_object, + &mut transaction, + ) + .await? + { + heartbeat + .update_seniority( + coverage_claim_time, + &self.seniority_sink, + &mut transaction, + ) + .await?; + } + } - if cache.get(&key).await.is_none() { + heartbeat.write(&self.heartbeat_sink).await?; + + let key = ( + heartbeat.cbsd_id.clone(), + heartbeat.truncated_timestamp()?, + heartbeat.coverage_object, + ); + if heartbeat_cache.get(&key).await.is_none() { heartbeat.save(&mut transaction).await?; - cache + heartbeat_cache .insert(key, (), time::Duration::from_secs(60 * 60 * 2)) .await; } } - self.file_sink.commit().await?; + self.heartbeat_sink.commit().await?; + self.seniority_sink.commit().await?; transaction.commit().await?; Ok(()) } } +pub struct HeartbeatReward { + pub hotspot_key: PublicKeyBinary, + pub cbsd_id: String, + pub reward_weight: Decimal, +} + /// Minimum number of heartbeats required to give a reward to the hotspot. pub const MINIMUM_HEARTBEAT_COUNT: i64 = 12; @@ -136,11 +189,18 @@ impl HeartbeatReward { ) -> impl Stream> + 'a { sqlx::query_as::<_, HeartbeatKey>( r#" - SELECT hotspot_key, cbsd_id, cell_type - FROM heartbeats + SELECT + hotspot_key, + heartbeats.cbsd_id, + cell_type + FROM + heartbeats WHERE truncated_timestamp >= $1 - and truncated_timestamp < $2 - GROUP BY cbsd_id, hotspot_key, cell_type + AND truncated_timestamp < $2 + GROUP BY + heartbeats.cbsd_id, + hotspot_key, + cell_type, HAVING count(*) >= $3 "#, ) @@ -158,45 +218,47 @@ pub struct Heartbeat { pub cell_type: Option, pub hotspot_key: PublicKeyBinary, pub timestamp: DateTime, + pub coverage_object: Option, + pub lat: f64, + pub lon: f64, pub validity: proto::HeartbeatValidity, } -#[derive(sqlx::FromRow)] -struct HeartbeatSaveResult { - inserted: bool, -} - -#[derive(thiserror::Error, Debug)] -pub enum SaveHeartbeatError { - #[error("rounding error: {0}")] - RoundingError(#[from] RoundingError), - #[error("sql error: {0}")] - SqlError(#[from] sqlx::Error), -} - impl Heartbeat { + fn is_valid(&self) -> bool { + self.validity == proto::HeartbeatValidity::Valid + } + pub fn truncated_timestamp(&self) -> Result, RoundingError> { self.timestamp.duration_trunc(Duration::hours(1)) } - pub async fn validate_heartbeats<'a>( + pub fn validate_heartbeats<'a>( gateway_client: &'a GatewayClient, + covered_hex_cache: &'a CoveredHexCache, heartbeats: impl Stream + 'a, epoch: &'a Range>, - ) -> impl Stream> + 'a { - heartbeats.then(move |heartbeat_report| { - let mut gateway_client = gateway_client.clone(); - async move { - let (cell_type, validity) = - validate_heartbeat(&heartbeat_report, &mut gateway_client, epoch).await?; - Ok(Heartbeat { - hotspot_key: heartbeat_report.report.pubkey, - cbsd_id: heartbeat_report.report.cbsd_id, - timestamp: heartbeat_report.received_timestamp, - cell_type, - validity, - }) - } + max_distance: f64, + ) -> impl Stream> + 'a { + heartbeats.then(move |heartbeat_report| async move { + let (cell_type, validity) = validate_heartbeat( + &heartbeat_report, + gateway_client, + covered_hex_cache, + epoch, + max_distance, + ) + .await?; + Ok(Heartbeat { + coverage_object: heartbeat_report.report.coverage_object(), + hotspot_key: heartbeat_report.report.pubkey, + cbsd_id: heartbeat_report.report.cbsd_id, + timestamp: heartbeat_report.received_timestamp, + lat: heartbeat_report.report.lat, + lon: heartbeat_report.report.lon, + cell_type, + validity, + }) }) } @@ -212,9 +274,12 @@ impl Heartbeat { cell_type: self.cell_type.unwrap_or(CellType::Neutrino430) as i32, // Is this the right default? validity: self.validity as i32, timestamp: self.timestamp.timestamp() as u64, - coverage_object: Vec::with_capacity(0), // Placeholder so the project compiles - lat: 0.0, - lon: 0.0, + lat: self.lat, + lon: self.lon, + coverage_object: self + .coverage_object + .map(|x| Vec::from(x.into_bytes())) + .unwrap_or_default(), }, &[("validity", self.validity.as_str_name())], ) @@ -222,10 +287,100 @@ impl Heartbeat { Ok(()) } - pub async fn save( - self, + pub async fn update_seniority( + &self, + coverage_claim_time: DateTime, + seniorities: &FileSinkClient, exec: &mut Transaction<'_, Postgres>, - ) -> Result { + ) -> anyhow::Result<()> { + enum InsertOrUpdate { + Insert(proto::SeniorityUpdateReason), + Update(DateTime), + } + + let (seniority_ts, update_reason) = if let Some(prev_seniority) = + sqlx::query_as::<_, crate::coverage::Seniority>( + "SELECT * FROM seniority WHERE cbsd_id = $1 ORDER BY last_heartbeat DESC LIMIT 1", + ) + .bind(&self.cbsd_id) + .fetch_optional(&mut *exec) + .await? + { + if self.coverage_object != Some(prev_seniority.uuid) { + ( + coverage_claim_time, + InsertOrUpdate::Insert(proto::SeniorityUpdateReason::NewCoverageClaimTime), + ) + } else if self.timestamp - prev_seniority.last_heartbeat > Duration::days(3) + && coverage_claim_time < self.timestamp + { + ( + self.timestamp, + InsertOrUpdate::Insert(proto::SeniorityUpdateReason::HeartbeatNotSeen), + ) + } else { + ( + coverage_claim_time, + InsertOrUpdate::Update(prev_seniority.seniority_ts), + ) + } + } else { + ( + coverage_claim_time, + InsertOrUpdate::Insert(proto::SeniorityUpdateReason::NewCoverageClaimTime), + ) + }; + + match update_reason { + InsertOrUpdate::Insert(update_reason) => { + sqlx::query( + r#" + INSERT INTO seniority + (cbsd_id, last_heartbeat, uuid, seniority_ts, inserted_at) + VALUES + ($1, $2, $3, $4, $5) + "#, + ) + .bind(&self.cbsd_id) + .bind(self.timestamp) + .bind(self.coverage_object) + .bind(seniority_ts) + .bind(self.timestamp) + .execute(&mut *exec) + .await?; + seniorities + .write( + proto::SeniorityUpdate { + cbsd_id: self.cbsd_id.to_string(), + new_seniority_timestamp: seniority_ts.timestamp() as u64, + reason: update_reason as i32, + }, + [], + ) + .await?; + } + InsertOrUpdate::Update(seniority_ts) => { + sqlx::query( + r#" + UPDATE seniority + SET last_heartbeat = $1 + WHERE + cbsd_id = $2 AND + seniority_ts = $3 + "#, + ) + .bind(self.timestamp) + .bind(&self.cbsd_id) + .bind(seniority_ts) + .execute(&mut *exec) + .await?; + } + } + + Ok(()) + } + + pub async fn save(self, exec: &mut Transaction<'_, Postgres>) -> anyhow::Result { // If the heartbeat is not valid, do not save it if self.validity != proto::HeartbeatValidity::Valid { return Ok(false); @@ -239,12 +394,13 @@ impl Heartbeat { let truncated_timestamp = self.truncated_timestamp()?; Ok( - sqlx::query_as::<_, HeartbeatSaveResult>( + sqlx::query_scalar( r#" - INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) - VALUES ($1, $2, $3, $4, $5) + INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object) + VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (cbsd_id, truncated_timestamp) DO UPDATE SET - latest_timestamp = EXCLUDED.latest_timestamp + latest_timestamp = EXCLUDED.latest_timestamp, + coverage_object = EXCLUDED.coverage_object RETURNING (xmax = 0) as inserted "# ) @@ -253,9 +409,9 @@ impl Heartbeat { .bind(self.cell_type.unwrap()) .bind(self.timestamp) .bind(truncated_timestamp) + .bind(self.coverage_object) .fetch_one(&mut *exec) .await? - .inserted ) } } @@ -263,9 +419,11 @@ impl Heartbeat { /// Validate a heartbeat in the given epoch. async fn validate_heartbeat( heartbeat: &CellHeartbeatIngestReport, - gateway_client: &mut GatewayClient, + gateway_client: &GatewayClient, + _coverage_cache: &CoveredHexCache, epoch: &Range>, -) -> Result<(Option, proto::HeartbeatValidity), ClientError> { + _max_distance: f64, +) -> anyhow::Result<(Option, proto::HeartbeatValidity)> { let cell_type = match CellType::from_cbsd_id(&heartbeat.report.cbsd_id) { Some(ty) => Some(ty), _ => return Ok((None, proto::HeartbeatValidity::BadCbsdId)), @@ -287,5 +445,27 @@ async fn validate_heartbeat( return Ok((cell_type, proto::HeartbeatValidity::GatewayOwnerNotFound)); } + /* + let Some(coverage_object) = heartbeat.report.coverage_object() else { + return Ok((cell_type, proto::HeartbeatValidity::BadCoverageObject)); + }; + + let Some(coverage) = coverage_cache.fetch_coverage(&coverage_object).await? else { + return Ok((cell_type, proto::HeartbeatValidity::NoSuchCoverageObject)); + }; + + if coverage.cbsd_id != heartbeat.report.cbsd_id { + return Ok((cell_type, proto::HeartbeatValidity::BadCoverageObject)); + } + + let Ok(latlng) = LatLng::new(heartbeat.report.lat, heartbeat.report.lon) else { + return Ok((cell_type, proto::HeartbeatValidity::InvalidLatLon)); + }; + + if coverage.max_distance_km(latlng) > max_distance { + return Ok((cell_type, proto::HeartbeatValidity::TooFarFromCoverage)); + } + */ + Ok((cell_type, proto::HeartbeatValidity::Valid)) } diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index 84d891206..52ccc7567 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -1,4 +1,5 @@ mod cell_type; +mod coverage; mod data_session; mod heartbeats; mod reward_shares; diff --git a/mobile_verifier/src/settings.rs b/mobile_verifier/src/settings.rs index c506f4cc1..65bc3c3de 100644 --- a/mobile_verifier/src/settings.rs +++ b/mobile_verifier/src/settings.rs @@ -25,6 +25,8 @@ pub struct Settings { pub config_client: mobile_config::ClientSettings, #[serde(default = "default_start_after")] pub start_after: u64, + #[serde(default = "default_max_heartbeat_distance_from_coverage_km")] + pub max_heartbeat_distance_from_coverage_km: f64, } pub fn default_log() -> String { @@ -43,6 +45,10 @@ pub fn default_reward_offset_minutes() -> i64 { 30 } +pub fn default_max_heartbeat_distance_from_coverage_km() -> f64 { + 2.5 +} + impl Settings { /// Load Settings from a given path. Settings are loaded from a given /// optional path and can be overriden with environment variables.