From 60f2542d56af4c50dea58a5df58eda36235fb880 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Wed, 30 Aug 2023 18:25:32 +0100 Subject: [PATCH] add support for wifi heartbeats --- Cargo.lock | 13 +- file_store/src/cli/bucket.rs | 8 +- file_store/src/cli/dump.rs | 22 +- file_store/src/cli/info.rs | 4 +- file_store/src/file_info.rs | 28 +- file_store/src/heartbeat.rs | 24 +- file_store/src/lib.rs | 1 + file_store/src/traits/msg_verify.rs | 3 +- file_store/src/wifi_heartbeat.rs | 147 ++++ ingest/src/server_mobile.rs | 41 +- mobile_verifier/Cargo.toml | 3 +- .../migrations/16_wifi_heartbeat.sql | 14 + mobile_verifier/src/cell_type.rs | 48 ++ mobile_verifier/src/cli/reward_from_db.rs | 6 +- mobile_verifier/src/cli/server.rs | 53 +- mobile_verifier/src/heartbeats.rs | 320 --------- mobile_verifier/src/heartbeats/cbrs.rs | 91 +++ mobile_verifier/src/heartbeats/mod.rs | 465 ++++++++++++ mobile_verifier/src/heartbeats/wifi.rs | 90 +++ mobile_verifier/src/lib.rs | 7 +- mobile_verifier/src/reward_shares.rs | 670 +++++++++++++++--- mobile_verifier/src/rewarder.rs | 21 +- mobile_verifier/src/settings.rs | 15 + mobile_verifier/tests/heartbeats.rs | 50 +- 24 files changed, 1640 insertions(+), 504 deletions(-) create mode 100644 file_store/src/wifi_heartbeat.rs create mode 100644 mobile_verifier/migrations/16_wifi_heartbeat.sql delete mode 100644 mobile_verifier/src/heartbeats.rs create mode 100644 mobile_verifier/src/heartbeats/cbrs.rs create mode 100644 mobile_verifier/src/heartbeats/mod.rs create mode 100644 mobile_verifier/src/heartbeats/wifi.rs diff --git a/Cargo.lock b/Cargo.lock index fee226b09..92dd0b935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1130,7 +1130,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#6cfd6dc952349e31187c7071dbbcd86780eb97e6" +source = "git+https://github.com/helium/proto?branch=master#d94ed4b4046263eb78003d484d94ad3cbff7a55f" dependencies = [ "base64 0.21.0", "byteorder", @@ -1140,7 +1140,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.6", + "sha2 0.9.9", "thiserror", ] @@ -2972,7 +2972,7 @@ dependencies = [ "bs58 0.5.0", "byteorder", "ed25519-compact", - "getrandom 0.2.8", + "getrandom 0.1.16", "k256", "lazy_static", "multihash", @@ -2990,7 +2990,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#6cfd6dc952349e31187c7071dbbcd86780eb97e6" +source = "git+https://github.com/helium/proto?branch=master#d94ed4b4046263eb78003d484d94ad3cbff7a55f" dependencies = [ "bytes", "prost", @@ -4132,6 +4132,7 @@ dependencies = [ "file-store", "futures", "futures-util", + "h3o", "helium-crypto", "helium-proto", "http-serde", @@ -7587,7 +7588,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.8.5", + "rand 0.7.3", "static_assertions", ] @@ -8198,7 +8199,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.6", + "sha2 0.9.9", "thiserror", "twox-hash", "xorf", diff --git a/file_store/src/cli/bucket.rs b/file_store/src/cli/bucket.rs index fbb0da4fa..2e950ef3f 100644 --- a/file_store/src/cli/bucket.rs +++ b/file_store/src/cli/bucket.rs @@ -1,5 +1,5 @@ use crate::{ - heartbeat::CellHeartbeat, iot_beacon_report::IotBeaconIngestReport, iot_valid_poc::IotPoc, + heartbeat::CbrsHeartbeat, iot_beacon_report::IotBeaconIngestReport, iot_valid_poc::IotPoc, iot_witness_report::IotWitnessIngestReport, speedtest::CellSpeedtest, traits::MsgDecode, Error, FileInfoStream, FileStore, FileType, Result, Settings, }; @@ -204,8 +204,8 @@ impl Locate { fn locate(prefix: &str, gateway: &PublicKey, buf: &[u8]) -> Result> { let pub_key = gateway.to_vec(); match FileType::from_str(prefix)? { - FileType::CellHeartbeat => { - CellHeartbeat::decode(buf).and_then(|event| event.to_value_if(pub_key)) + FileType::CbrsHeartbeat => { + CbrsHeartbeat::decode(buf).and_then(|event| event.to_value_if(pub_key)) } FileType::CellSpeedtest => { CellSpeedtest::decode(buf).and_then(|event| event.to_value_if(pub_key)) @@ -254,7 +254,7 @@ where } } -impl Gateway for CellHeartbeat { +impl Gateway for CbrsHeartbeat { fn has_pubkey(&self, pub_key: &[u8]) -> bool { self.pubkey.as_ref() == pub_key } diff --git a/file_store/src/cli/dump.rs b/file_store/src/cli/dump.rs index 0622f3fea..a18e8152a 100644 --- a/file_store/src/cli/dump.rs +++ b/file_store/src/cli/dump.rs @@ -1,12 +1,13 @@ use crate::{ cli::print_json, file_source, - heartbeat::{CellHeartbeat, CellHeartbeatIngestReport}, + heartbeat::{CbrsHeartbeat, CbrsHeartbeatIngestReport}, iot_packet::IotValidPacket, mobile_session::{DataTransferSessionIngestReport, InvalidDataTransferIngestReport}, mobile_subscriber::{SubscriberLocationIngestReport, VerifiedSubscriberLocationIngestReport}, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, traits::MsgDecode, + wifi_heartbeat::WifiHeartbeatIngestReport, FileType, Result, Settings, }; use base64::Engine; @@ -50,17 +51,27 @@ impl Cmd { while let Some(result) = file_stream.next().await { let msg = result?; match self.file_type { - FileType::CellHeartbeat => { + FileType::CbrsHeartbeat => { let dec_msg = CellHeartbeatReqV1::decode(msg)?; - wtr.serialize(CellHeartbeat::try_from(dec_msg)?)?; + wtr.serialize(CbrsHeartbeat::try_from(dec_msg)?)?; + } + FileType::WifiHeartbeatIngestReport => { + let msg = WifiHeartbeatIngestReport::decode(msg)?; + let json = json!({ + "received_timestamp": msg.received_timestamp, + "pubkey": msg.report.pubkey, + "operation_mode": msg.report.operation_mode, + "location_validation_timestamp": msg.report.location_validation_timestamp, + }); + print_json(&json)?; } FileType::CellSpeedtest => { let dec_msg = SpeedtestReqV1::decode(msg)?; wtr.serialize(CellSpeedtest::try_from(dec_msg)?)?; } - FileType::CellHeartbeatIngestReport => { + FileType::CbrsHeartbeatIngestReport => { let dec_msg = CellHeartbeatIngestReportV1::decode(msg)?; - let ingest_report = CellHeartbeatIngestReport::try_from(dec_msg)?; + let ingest_report = CbrsHeartbeatIngestReport::try_from(dec_msg)?; print_json(&ingest_report)?; } FileType::CellSpeedtestIngestReport => { @@ -207,6 +218,7 @@ impl Cmd { "dc_transfer_reward": reward.dc_transfer_reward, }))?, Some(Reward::RadioReward(reward)) => print_json(&json!({ + "hotspot_key": PublicKey::try_from(reward.hotspot_key)?, "cbsd_id": reward.cbsd_id, "poc_reward": reward.poc_reward, }))?, diff --git a/file_store/src/cli/info.rs b/file_store/src/cli/info.rs index ecf8d4769..ad9301815 100644 --- a/file_store/src/cli/info.rs +++ b/file_store/src/cli/info.rs @@ -74,13 +74,13 @@ impl MsgTimestamp>> for PriceReportV1 { fn get_timestamp(file_type: &str, buf: &[u8]) -> Result> { let result = match FileType::from_str(file_type)? { - FileType::CellHeartbeat => CellHeartbeatReqV1::decode(buf) + FileType::CbrsHeartbeat => CellHeartbeatReqV1::decode(buf) .map_err(Error::from) .and_then(|entry| entry.timestamp())?, FileType::CellSpeedtest => SpeedtestReqV1::decode(buf) .map_err(Error::from) .and_then(|entry| entry.timestamp())?, - FileType::CellHeartbeatIngestReport => CellHeartbeatIngestReportV1::decode(buf) + FileType::CbrsHeartbeatIngestReport => CellHeartbeatIngestReportV1::decode(buf) .map_err(Error::from) .and_then(|ingest_report| { ingest_report.report.ok_or_else(|| { diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index ff06df88a..dee15429d 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -109,10 +109,12 @@ impl FileInfo { pub const SUBSCRIBER_LOCATION_REQ: &str = "subscriber_location_req"; pub const SUBSCRIBER_LOCATION_INGEST_REPORT: &str = "subscriber_location_report"; pub const VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT: &str = "verified_subscriber_location_report"; -pub const CELL_HEARTBEAT: &str = "cell_heartbeat"; +pub const CBRS_HEARTBEAT: &str = "cbrs_heartbeat"; +pub const WIFI_HEARTBEAT: &str = "wifi_heartbeat"; pub const CELL_SPEEDTEST: &str = "cell_speedtest"; pub const VERIFIED_SPEEDTEST: &str = "verified_speedtest"; pub const CELL_HEARTBEAT_INGEST_REPORT: &str = "heartbeat_report"; +pub const WIFI_HEARTBEAT_INGEST_REPORT: &str = "wifi_heartbeat_report"; pub const CELL_SPEEDTEST_INGEST_REPORT: &str = "speedtest_report"; pub const ENTROPY: &str = "entropy"; pub const SUBNETWORK_REWARDS: &str = "subnetwork_rewards"; @@ -144,11 +146,11 @@ pub const COVERAGE_OBJECT_INGEST_REPORT: &str = "coverage_object_ingest_report"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] pub enum FileType { - CellHeartbeat = 0, + CbrsHeartbeat = 0, CellSpeedtest = 1, Entropy = 2, SubnetworkRewards = 3, - CellHeartbeatIngestReport, + CbrsHeartbeatIngestReport, CellSpeedtestIngestReport, EntropyReport, IotBeaconIngestReport, @@ -177,6 +179,8 @@ pub enum FileType { MapperMsg, CoverageObjectIngestReport, VerifiedSpeedtest, + WifiHeartbeat, + WifiHeartbeatIngestReport, } impl fmt::Display for FileType { @@ -187,10 +191,12 @@ impl fmt::Display for FileType { Self::VerifiedSubscriberLocationIngestReport => { VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT } - Self::CellHeartbeat => CELL_HEARTBEAT, + Self::CbrsHeartbeat => CBRS_HEARTBEAT, + Self::WifiHeartbeat => WIFI_HEARTBEAT, Self::CellSpeedtest => CELL_SPEEDTEST, Self::VerifiedSpeedtest => VERIFIED_SPEEDTEST, - Self::CellHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT, + Self::CbrsHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT, + Self::WifiHeartbeatIngestReport => WIFI_HEARTBEAT_INGEST_REPORT, Self::CellSpeedtestIngestReport => CELL_SPEEDTEST_INGEST_REPORT, Self::Entropy => ENTROPY, Self::SubnetworkRewards => SUBNETWORK_REWARDS, @@ -232,10 +238,12 @@ impl FileType { Self::VerifiedSubscriberLocationIngestReport => { VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT } - Self::CellHeartbeat => CELL_HEARTBEAT, + Self::CbrsHeartbeat => CBRS_HEARTBEAT, + Self::WifiHeartbeat => WIFI_HEARTBEAT, Self::CellSpeedtest => CELL_SPEEDTEST, Self::VerifiedSpeedtest => VERIFIED_SPEEDTEST, - Self::CellHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT, + Self::CbrsHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT, + Self::WifiHeartbeatIngestReport => WIFI_HEARTBEAT_INGEST_REPORT, Self::CellSpeedtestIngestReport => CELL_SPEEDTEST_INGEST_REPORT, Self::Entropy => ENTROPY, Self::SubnetworkRewards => SUBNETWORK_REWARDS, @@ -277,10 +285,12 @@ impl FromStr for FileType { VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT => { Self::VerifiedSubscriberLocationIngestReport } - CELL_HEARTBEAT => Self::CellHeartbeat, + CBRS_HEARTBEAT => Self::CbrsHeartbeat, + WIFI_HEARTBEAT => Self::WifiHeartbeat, CELL_SPEEDTEST => Self::CellSpeedtest, VERIFIED_SPEEDTEST => Self::VerifiedSpeedtest, - CELL_HEARTBEAT_INGEST_REPORT => Self::CellHeartbeatIngestReport, + CELL_HEARTBEAT_INGEST_REPORT => Self::CbrsHeartbeatIngestReport, + WIFI_HEARTBEAT_INGEST_REPORT => Self::WifiHeartbeatIngestReport, CELL_SPEEDTEST_INGEST_REPORT => Self::CellSpeedtestIngestReport, ENTROPY => Self::Entropy, SUBNETWORK_REWARDS => Self::SubnetworkRewards, diff --git a/file_store/src/heartbeat.rs b/file_store/src/heartbeat.rs index 422fc2564..94b334f3a 100644 --- a/file_store/src/heartbeat.rs +++ b/file_store/src/heartbeat.rs @@ -8,7 +8,7 @@ use helium_proto::services::poc_mobile::{CellHeartbeatIngestReportV1, CellHeartb use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct CellHeartbeat { +pub struct CbrsHeartbeat { pub pubkey: PublicKeyBinary, pub hotspot_type: String, pub cell_id: u32, @@ -21,20 +21,20 @@ pub struct CellHeartbeat { } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct CellHeartbeatIngestReport { +pub struct CbrsHeartbeatIngestReport { pub received_timestamp: DateTime, - pub report: CellHeartbeat, + pub report: CbrsHeartbeat, } -impl MsgDecode for CellHeartbeat { +impl MsgDecode for CbrsHeartbeat { type Msg = CellHeartbeatReqV1; } -impl MsgDecode for CellHeartbeatIngestReport { +impl MsgDecode for CbrsHeartbeatIngestReport { type Msg = CellHeartbeatIngestReportV1; } -impl TryFrom for CellHeartbeat { +impl TryFrom for CbrsHeartbeat { type Error = Error; fn try_from(v: CellHeartbeatReqV1) -> Result { Ok(Self { @@ -57,7 +57,7 @@ impl MsgTimestamp>> for CellHeartbeatReqV1 { } } -impl TryFrom for CellHeartbeatIngestReport { +impl TryFrom for CbrsHeartbeatIngestReport { type Error = Error; fn try_from(v: CellHeartbeatIngestReportV1) -> Result { Ok(Self { @@ -108,17 +108,17 @@ mod tests { let buffer = report.encode_to_vec(); - let cellheartbeatreport = CellHeartbeatIngestReport::decode(buffer.as_slice()) - .expect("unable to decode into CellHeartbeat"); + let heartbeatreport = CbrsHeartbeatIngestReport::decode(buffer.as_slice()) + .expect("unable to decode into CbrsHeartbeat"); assert_eq!( - cellheartbeatreport.received_timestamp, + heartbeatreport.received_timestamp, Utc.timestamp_millis_opt(now).unwrap() ); assert_eq!( report.timestamp().expect("timestamp"), - cellheartbeatreport.received_timestamp + heartbeatreport.received_timestamp ); - assert_eq!(cellheartbeatreport.report.cell_id, 123); + assert_eq!(heartbeatreport.report.cell_id, 123); } } diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index 338ef9f68..4e01bfc06 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -20,6 +20,7 @@ pub mod reward_manifest; mod settings; pub mod speedtest; pub mod traits; +pub mod wifi_heartbeat; pub use crate::file_store::FileStore; pub use cli::bucket::FileFilter; diff --git a/file_store/src/traits/msg_verify.rs b/file_store/src/traits/msg_verify.rs index 6499a9929..295a02f6c 100644 --- a/file_store/src/traits/msg_verify.rs +++ b/file_store/src/traits/msg_verify.rs @@ -7,7 +7,7 @@ use helium_proto::services::{ use helium_proto::{ services::poc_mobile::{ CellHeartbeatReqV1, CoverageObjectReqV1, DataTransferSessionReqV1, SpeedtestReqV1, - SubscriberLocationReqV1, + SubscriberLocationReqV1, WifiHeartbeatReqV1, }, Message, }; @@ -31,6 +31,7 @@ macro_rules! impl_msg_verify { } impl_msg_verify!(SubscriberLocationReqV1, signature); impl_msg_verify!(CellHeartbeatReqV1, signature); +impl_msg_verify!(WifiHeartbeatReqV1, signature); impl_msg_verify!(SpeedtestReqV1, signature); impl_msg_verify!(LoraBeaconReportReqV1, signature); impl_msg_verify!(LoraWitnessReportReqV1, signature); diff --git a/file_store/src/wifi_heartbeat.rs b/file_store/src/wifi_heartbeat.rs new file mode 100644 index 000000000..7e9ad66f1 --- /dev/null +++ b/file_store/src/wifi_heartbeat.rs @@ -0,0 +1,147 @@ +use crate::{ + traits::{MsgDecode, MsgTimestamp, TimestampDecode}, + Error, Result, +}; +use chrono::{DateTime, Utc}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::{WifiHeartbeatIngestReportV1, WifiHeartbeatReqV1}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct WifiHeartbeat { + pub pubkey: PublicKeyBinary, + pub lat: f64, + pub lon: f64, + pub operation_mode: bool, + pub location_validation_timestamp: Option>, + pub coverage_object: Vec, + pub timestamp: DateTime, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct WifiHeartbeatIngestReport { + pub received_timestamp: DateTime, + pub report: WifiHeartbeat, +} + +impl MsgDecode for WifiHeartbeat { + type Msg = WifiHeartbeatReqV1; +} + +impl MsgDecode for WifiHeartbeatIngestReport { + type Msg = WifiHeartbeatIngestReportV1; +} + +impl TryFrom for WifiHeartbeat { + type Error = Error; + fn try_from(v: WifiHeartbeatReqV1) -> Result { + let location_validation_timestamp = if v.location_validation_timestamp == 0 { + None + } else { + v.location_validation_timestamp.to_timestamp().ok() + }; + Ok(Self { + pubkey: v.pub_key.into(), + lat: v.lat, + lon: v.lon, + operation_mode: v.operation_mode, + coverage_object: v.coverage_object, + timestamp: v.timestamp.to_timestamp()?, + location_validation_timestamp, + }) + } +} + +impl MsgTimestamp>> for WifiHeartbeatReqV1 { + fn timestamp(&self) -> Result> { + self.timestamp.to_timestamp() + } +} + +impl TryFrom for WifiHeartbeatIngestReport { + type Error = Error; + fn try_from(v: WifiHeartbeatIngestReportV1) -> Result { + Ok(Self { + received_timestamp: v.timestamp()?, + report: v + .report + .ok_or_else(|| Error::not_found("ingest wifi heartbeat report"))? + .try_into()?, + }) + } +} + +impl MsgTimestamp>> for WifiHeartbeatIngestReportV1 { + fn timestamp(&self) -> Result> { + self.received_timestamp.to_timestamp_millis() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + use hex_literal::hex; + use prost::Message; + + const PK_BYTES: [u8; 33] = + hex!("008f23e96ab6bbff48c8923cac831dc97111bcf33dba9f5a8539c00f9d93551af1"); + + #[test] + fn decode_proto_heartbeat_ingest_report_to_internal_struct() { + let now = Utc::now().timestamp_millis(); + let report1 = WifiHeartbeatIngestReportV1 { + received_timestamp: now as u64, + report: Some(WifiHeartbeatReqV1 { + pub_key: PK_BYTES.to_vec(), + lat: 72.63, + lon: 72.53, + operation_mode: true, + coverage_object: vec![], + timestamp: Utc::now().timestamp() as u64, + location_validation_timestamp: now as u64, + signature: vec![], + }), + }; + let report2 = WifiHeartbeatIngestReportV1 { + received_timestamp: now as u64, + report: Some(WifiHeartbeatReqV1 { + pub_key: PK_BYTES.to_vec(), + lat: 72.63, + lon: 72.53, + operation_mode: true, + coverage_object: vec![], + timestamp: Utc::now().timestamp() as u64, + location_validation_timestamp: 0, + signature: vec![], + }), + }; + + let buffer1 = report1.encode_to_vec(); + let buffer2 = report2.encode_to_vec(); + + let wifiheartbeatreport1 = WifiHeartbeatIngestReport::decode(buffer1.as_slice()) + .expect("unable to decode into WifiHeartbeat"); + + assert_eq!( + wifiheartbeatreport1.received_timestamp, + Utc.timestamp_millis_opt(now).unwrap() + ); + assert_eq!( + report1.timestamp().expect("timestamp"), + wifiheartbeatreport1.received_timestamp + ); + assert!(wifiheartbeatreport1 + .report + .location_validation_timestamp + .is_some()); + + let wifiheartbeatreport2 = WifiHeartbeatIngestReport::decode(buffer2.as_slice()) + .expect("unable to decode into WifiHeartbeat"); + + assert!(wifiheartbeatreport2 + .report + .location_validation_timestamp + .is_none()); + } +} diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 07232d2fb..bfe6920a8 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -15,7 +15,8 @@ use helium_proto::services::poc_mobile::{ CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1, SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1, - SubscriberLocationReqV1, SubscriberLocationRespV1, + SubscriberLocationReqV1, SubscriberLocationRespV1, WifiHeartbeatIngestReportV1, + WifiHeartbeatReqV1, WifiHeartbeatRespV1, }; use std::{net::SocketAddr, path::Path}; use task_manager::{ManagedTask, TaskManager}; @@ -31,6 +32,7 @@ pub type VerifyResult = std::result::Result; pub struct GrpcServer { heartbeat_report_sink: FileSinkClient, + wifi_heartbeat_report_sink: FileSinkClient, speedtest_report_sink: FileSinkClient, data_transfer_session_sink: FileSinkClient, subscriber_location_report_sink: FileSinkClient, @@ -134,6 +136,28 @@ impl poc_mobile::PocMobile for GrpcServer { Ok(Response::new(CellHeartbeatRespV1 { id })) } + async fn submit_wifi_heartbeat( + &self, + request: Request, + ) -> GrpcResult { + let timestamp: u64 = Utc::now().timestamp_millis() as u64; + let event = request.into_inner(); + + let report = self + .verify_public_key(event.pub_key.as_ref()) + .and_then(|public_key| self.verify_network(public_key)) + .and_then(|public_key| self.verify_signature(public_key, event)) + .map(|(_, event)| WifiHeartbeatIngestReportV1 { + received_timestamp: timestamp, + report: Some(event), + })?; + + _ = self.wifi_heartbeat_report_sink.write(report, []).await; + + let id = timestamp.to_string(); + Ok(Response::new(WifiHeartbeatRespV1 { id })) + } + async fn submit_data_transfer_session( &self, request: Request, @@ -223,7 +247,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let store_base_path = Path::new(&settings.cache); let (heartbeat_report_sink, heartbeat_report_sink_server) = file_sink::FileSinkBuilder::new( - FileType::CellHeartbeatIngestReport, + FileType::CbrsHeartbeatIngestReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_heartbeat_report"), ) @@ -232,6 +256,17 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .create() .await?; + let (wifi_heartbeat_report_sink, wifi_heartbeat_report_sink_server) = + file_sink::FileSinkBuilder::new( + FileType::WifiHeartbeatIngestReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_wifi_heartbeat_report"), + ) + .file_upload(Some(file_upload.clone())) + .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .create() + .await?; + // speedtests let (speedtest_report_sink, speedtest_report_sink_server) = file_sink::FileSinkBuilder::new( FileType::CellSpeedtestIngestReport, @@ -289,6 +324,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let grpc_server = GrpcServer { heartbeat_report_sink, + wifi_heartbeat_report_sink, speedtest_report_sink, data_transfer_session_sink, subscriber_location_report_sink, @@ -306,6 +342,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { TaskManager::builder() .add_task(file_upload_server) .add_task(heartbeat_report_sink_server) + .add_task(wifi_heartbeat_report_sink_server) .add_task(speedtest_report_sink_server) .add_task(data_transfer_session_sink_server) .add_task(subscriber_location_report_sink_server) diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index a7fa49bee..f1664e3ab 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -12,6 +12,7 @@ config = {workspace = true} thiserror = {workspace = true} serde = {workspace = true} serde_json = {workspace = true} +h3o = {workspace = true, features = ["geo"]} http-serde = {workspace = true} clap = {workspace = true} sqlx = {workspace = true} @@ -43,4 +44,4 @@ reward-scheduler = {path = "../reward_scheduler"} price = {path = "../price"} rand = {workspace = true} async-trait = {workspace = true} -retainer = {workspace = true} \ No newline at end of file +retainer = {workspace = true} diff --git a/mobile_verifier/migrations/16_wifi_heartbeat.sql b/mobile_verifier/migrations/16_wifi_heartbeat.sql new file mode 100644 index 000000000..27dea551b --- /dev/null +++ b/mobile_verifier/migrations/16_wifi_heartbeat.sql @@ -0,0 +1,14 @@ +ALTER TYPE cell_type ADD VALUE 'celltypenone' AFTER 'sercommoutdoor'; +ALTER TYPE cell_type ADD VALUE 'novagenericwifiindoor' AFTER 'celltypenone'; + +CREATE TABLE wifi_heartbeats ( + hotspot_key TEXT NOT NULL, + cell_type cell_type NOT NULL, + truncated_timestamp TIMESTAMPTZ NOT NULL CHECK (truncated_timestamp = date_trunc('hour', truncated_timestamp)), + latest_timestamp TIMESTAMPTZ NOT NULL, + location_validation_timestamp TIMESTAMPTZ, + distance_to_asserted BIGINT, + PRIMARY KEY(hotspot_key, truncated_timestamp) +); + +ALTER TABLE heartbeats RENAME TO cbrs_heartbeats; diff --git a/mobile_verifier/src/cell_type.rs b/mobile_verifier/src/cell_type.rs index c9bf38f87..8ac2db40a 100644 --- a/mobile_verifier/src/cell_type.rs +++ b/mobile_verifier/src/cell_type.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use helium_proto::services::poc_mobile::CellType as CellTypeProto; use rust_decimal::Decimal; use rust_decimal_macros::dec; @@ -12,12 +13,21 @@ pub const CELLTYPE_SERCCOMM_OUTDOOR: &str = "P27-SCO4255PA10"; #[derive(Debug, Eq, Hash, PartialEq, Copy, Clone, Serialize, sqlx::Type)] #[sqlx(type_name = "cell_type")] #[sqlx(rename_all = "lowercase")] + pub enum CellType { Nova436H = 0, Nova430I = 1, Neutrino430 = 2, SercommIndoor = 3, SercommOutdoor = 4, + CellTypeNone = 5, + NovaGenericWifiIndoor = 6, +} +#[derive(PartialEq)] +pub enum CellTypeLabel { + CellTypeLabelNone = 0, + CBRS = 1, + Wifi = 2, } impl CellType { @@ -32,6 +42,18 @@ impl CellType { } } + pub fn to_label(self) -> CellTypeLabel { + match self { + Self::Nova436H => CellTypeLabel::CBRS, + Self::Nova430I => CellTypeLabel::CBRS, + Self::Neutrino430 => CellTypeLabel::CBRS, + Self::SercommIndoor => CellTypeLabel::CBRS, + Self::SercommOutdoor => CellTypeLabel::CBRS, + Self::CellTypeNone => CellTypeLabel::CellTypeLabelNone, + Self::NovaGenericWifiIndoor => CellTypeLabel::Wifi, + } + } + pub fn reward_weight(&self) -> Decimal { match self { Self::Nova436H => dec!(4.0), @@ -39,6 +61,30 @@ impl CellType { Self::Neutrino430 => dec!(1.0), Self::SercommIndoor => dec!(1.0), Self::SercommOutdoor => dec!(2.5), + Self::CellTypeNone => dec!(0.0), + Self::NovaGenericWifiIndoor => dec!(0.4), + } + } + + pub fn location_weight( + &self, + location_validation_timestamp: Option>, + distance: Option, + max_distance_to_asserted: u32, + ) -> Decimal { + match (self, distance, location_validation_timestamp.is_some()) { + (Self::NovaGenericWifiIndoor, Some(dist), true) + if dist <= max_distance_to_asserted as i64 => + { + dec!(1.0) + } + (Self::NovaGenericWifiIndoor, Some(dist), true) + if dist > max_distance_to_asserted as i64 => + { + dec!(0.25) + } + (Self::NovaGenericWifiIndoor, _, _) => dec!(0.25), + _ => dec!(1.0), } } } @@ -51,6 +97,8 @@ impl From for CellTypeProto { CellType::Neutrino430 => Self::Neutrino430, CellType::SercommIndoor => Self::SercommIndoor, CellType::SercommOutdoor => Self::SercommOutdoor, + CellType::CellTypeNone => Self::None, + CellType::NovaGenericWifiIndoor => Self::NovaGenericWifiIndoor, } } } diff --git a/mobile_verifier/src/cli/reward_from_db.rs b/mobile_verifier/src/cli/reward_from_db.rs index 2f07e9576..0d69e7cbb 100644 --- a/mobile_verifier/src/cli/reward_from_db.rs +++ b/mobile_verifier/src/cli/reward_from_db.rs @@ -35,10 +35,12 @@ impl Cmd { let (shutdown_trigger, _shutdown_listener) = triggered::trigger(); let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?; - let heartbeats = HeartbeatReward::validated(&pool, &epoch); + let heartbeats = + HeartbeatReward::validated(&pool, &epoch, settings.max_asserted_distance_deviation); let speedtest_averages = SpeedtestAverages::aggregate_epoch_averages(epoch.end, &pool).await?; - let reward_shares = PocShares::aggregate(heartbeats, &speedtest_averages).await?; + let reward_shares = + PocShares::aggregate(heartbeats, &speedtest_averages, settings.reward_wifi_hbs).await?; let mut total_rewards = 0_u64; let mut owner_rewards = HashMap::<_, u64>::new(); diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index b145c36ef..585fdda38 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -1,5 +1,6 @@ use crate::{ - data_session::DataSessionIngestor, heartbeats::HeartbeatDaemon, rewarder::Rewarder, + data_session::DataSessionIngestor, heartbeats::cbrs::HeartbeatDaemon as CellHeartbeatDaemon, + heartbeats::wifi::HeartbeatDaemon as WifiHeartbeatDaemon, rewarder::Rewarder, speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, telemetry, Settings, }; @@ -7,9 +8,9 @@ use anyhow::{Error, Result}; use chrono::Duration; use file_store::{ file_info_poller::LookbackBehavior, file_sink, file_source, file_upload, - heartbeat::CellHeartbeatIngestReport, mobile_subscriber::SubscriberLocationIngestReport, - mobile_transfer::ValidDataTransferSession, speedtest::CellSpeedtestIngestReport, FileStore, - FileType, + heartbeat::CbrsHeartbeatIngestReport, mobile_subscriber::SubscriberLocationIngestReport, + mobile_transfer::ValidDataTransferSession, speedtest::CellSpeedtestIngestReport, + wifi_heartbeat::WifiHeartbeatIngestReport, FileStore, FileType, }; use futures_util::TryFutureExt; use mobile_config::client::{AuthorizationClient, EntityClient, GatewayClient}; @@ -55,15 +56,29 @@ impl Cmd { let (price_tracker, tracker_process) = PriceTracker::start(&settings.price_tracker, shutdown_listener.clone()).await?; - // Heartbeats - let (heartbeats, heartbeats_server) = - file_source::continuous_source::() + // CBRS Heartbeats + let (cbrs_heartbeats, cbrs_heartbeats_server) = + file_source::continuous_source::() .db(pool.clone()) .store(report_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .file_type(FileType::CellHeartbeatIngestReport) + .file_type(FileType::CbrsHeartbeatIngestReport) .create()?; - let heartbeats_join_handle = heartbeats_server.start(shutdown_listener.clone()).await?; + let cbrs_heartbeats_join_handle = cbrs_heartbeats_server + .start(shutdown_listener.clone()) + .await?; + + // Wifi Heartbeats + let (wifi_heartbeats, wifi_heartbeats_server) = + file_source::continuous_source::() + .db(pool.clone()) + .store(report_ingest.clone()) + .lookback(LookbackBehavior::StartAfter(settings.start_after())) + .file_type(FileType::WifiHeartbeatIngestReport) + .create()?; + let wifi_heartbeats_join_handle = wifi_heartbeats_server + .start(shutdown_listener.clone()) + .await?; let (valid_heartbeats, valid_heartbeats_server) = file_sink::FileSinkBuilder::new( FileType::ValidatedHeartbeat, @@ -76,13 +91,19 @@ impl Cmd { .create() .await?; - let heartbeat_daemon = HeartbeatDaemon::new( + let cbrs_heartbeat_daemon = CellHeartbeatDaemon::new( pool.clone(), gateway_client.clone(), - heartbeats, - valid_heartbeats, + cbrs_heartbeats, + valid_heartbeats.clone(), ); + let wifi_heartbeat_daemon = WifiHeartbeatDaemon::new( + pool.clone(), + gateway_client.clone(), + wifi_heartbeats, + valid_heartbeats, + ); // Speedtests let (speedtests, speedtests_server) = file_source::continuous_source::() @@ -152,6 +173,8 @@ impl Cmd { mobile_rewards, reward_manifests, price_tracker, + settings.reward_wifi_hbs, + settings.max_asserted_distance_deviation, ); // subscriber location @@ -228,9 +251,11 @@ impl Cmd { .run(data_session_ingest, shutdown_listener.clone()) .map_err(Error::from), tracker_process.map_err(Error::from), - heartbeats_join_handle.map_err(Error::from), + cbrs_heartbeats_join_handle.map_err(Error::from), + wifi_heartbeats_join_handle.map_err(Error::from), speedtests_join_handle.map_err(Error::from), - heartbeat_daemon.run(shutdown_listener.clone()), + cbrs_heartbeat_daemon.run(shutdown_listener.clone()), + wifi_heartbeat_daemon.run(shutdown_listener.clone()), speedtest_daemon.run(shutdown_listener.clone()), rewarder.run(shutdown_listener.clone()), subscriber_location_ingest_join_handle.map_err(anyhow::Error::from), diff --git a/mobile_verifier/src/heartbeats.rs b/mobile_verifier/src/heartbeats.rs deleted file mode 100644 index 313a42a75..000000000 --- a/mobile_verifier/src/heartbeats.rs +++ /dev/null @@ -1,320 +0,0 @@ -//! Heartbeat storage - -use crate::cell_type::CellType; -use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc}; -use file_store::{ - file_info_poller::FileInfoStream, file_sink::FileSinkClient, - heartbeat::CellHeartbeatIngestReport, -}; -use futures::{ - stream::{Stream, StreamExt, TryStreamExt}, - TryFutureExt, -}; -use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_mobile as proto; -use mobile_config::{client::ClientError, gateway_info::GatewayInfoResolver, GatewayClient}; -use retainer::Cache; -use rust_decimal::{prelude::ToPrimitive, Decimal}; -use sqlx::{Postgres, Transaction}; -use std::{ops::Range, pin::pin, sync::Arc, time}; -use tokio::sync::mpsc::Receiver; - -#[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::FromRow)] -pub struct HeartbeatKey { - hotspot_key: PublicKeyBinary, - cbsd_id: String, - cell_type: CellType, -} - -#[derive(Debug, Clone, PartialEq)] -pub struct HeartbeatReward { - pub hotspot_key: PublicKeyBinary, - pub cbsd_id: String, - pub reward_weight: Decimal, -} - -impl From for HeartbeatReward { - fn from(value: HeartbeatKey) -> Self { - Self { - hotspot_key: value.hotspot_key, - cbsd_id: value.cbsd_id, - reward_weight: value.cell_type.reward_weight(), - } - } -} - -pub struct HeartbeatDaemon { - pool: sqlx::Pool, - gateway_client: GatewayClient, - heartbeats: Receiver>, - file_sink: FileSinkClient, -} - -impl HeartbeatDaemon { - pub fn new( - pool: sqlx::Pool, - gateway_client: GatewayClient, - heartbeats: Receiver>, - file_sink: FileSinkClient, - ) -> Self { - Self { - pool, - gateway_client, - heartbeats, - file_sink, - } - } - - pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { - tokio::spawn(async move { - let cache = Arc::new(Cache::<(String, DateTime), ()>::new()); - - let cache_clone = cache.clone(); - tokio::spawn(async move { - cache_clone - .monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 3)) - .await - }); - - loop { - tokio::select! { - biased; - _ = shutdown.clone() => { - tracing::info!("HeartbeatDaemon shutting down"); - break; - } - Some(file) = self.heartbeats.recv() => self.process_file(file, &cache).await?, - } - } - - Ok(()) - }) - .map_err(anyhow::Error::from) - .and_then(|result| async move { result }) - .await - } - - async fn process_file( - &self, - file: FileInfoStream, - cache: &Cache<(String, DateTime), ()>, - ) -> anyhow::Result<()> { - tracing::info!("Processing heartbeat file {}", file.file_info.key); - - let epoch = (file.file_info.timestamp - Duration::hours(3)) - ..(file.file_info.timestamp + Duration::minutes(30)); - let mut transaction = self.pool.begin().await?; - let reports = file.into_stream(&mut transaction).await?; - - let mut validated_heartbeats = - pin!(Heartbeat::validate_heartbeats(&self.gateway_client, reports, &epoch).await); - - while let Some(heartbeat) = validated_heartbeats.next().await.transpose()? { - heartbeat.write(&self.file_sink).await?; - - if !heartbeat.is_valid() { - continue; - } - - let key = (heartbeat.cbsd_id.clone(), heartbeat.truncated_timestamp()?); - - if cache.get(&key).await.is_none() { - heartbeat.save(&mut transaction).await?; - cache - .insert(key, (), time::Duration::from_secs(60 * 60 * 2)) - .await; - } - } - - self.file_sink.commit().await?; - transaction.commit().await?; - - Ok(()) - } -} - -/// Minimum number of heartbeats required to give a reward to the hotspot. -pub const MINIMUM_HEARTBEAT_COUNT: i64 = 12; - -impl HeartbeatReward { - pub fn validated<'a>( - exec: impl sqlx::PgExecutor<'a> + Copy + 'a, - epoch: &'a Range>, - ) -> impl Stream> + 'a { - sqlx::query_as::<_, HeartbeatKey>( - r#" - WITH latest_hotspots AS ( - SELECT t1.cbsd_id, t1.hotspot_key, t1.latest_timestamp - FROM heartbeats t1 - WHERE t1.latest_timestamp = ( - SELECT MAX(t2.latest_timestamp) - FROM heartbeats t2 - WHERE t2.cbsd_id = t1.cbsd_id - AND truncated_timestamp >= $1 - AND truncated_timestamp < $2 - ) - ) - SELECT - latest_hotspots.hotspot_key, - heartbeats.cbsd_id, - cell_type - FROM heartbeats - JOIN latest_hotspots ON heartbeats.cbsd_id = latest_hotspots.cbsd_id - WHERE truncated_timestamp >= $1 - AND truncated_timestamp < $2 - GROUP BY - heartbeats.cbsd_id, - latest_hotspots.hotspot_key, - cell_type - HAVING count(*) >= $3 - "#, - ) - .bind(epoch.start) - .bind(epoch.end) - .bind(MINIMUM_HEARTBEAT_COUNT) - .fetch(exec) - .map_ok(HeartbeatReward::from) - } -} - -#[derive(Clone)] -pub struct Heartbeat { - pub cbsd_id: String, - pub cell_type: Option, - pub hotspot_key: PublicKeyBinary, - pub timestamp: DateTime, - pub validity: proto::HeartbeatValidity, -} - -#[derive(sqlx::FromRow)] -struct HeartbeatSaveResult { - inserted: bool, -} - -#[derive(thiserror::Error, Debug)] -pub enum SaveHeartbeatError { - #[error("rounding error: {0}")] - RoundingError(#[from] RoundingError), - #[error("sql error: {0}")] - SqlError(#[from] sqlx::Error), -} - -impl Heartbeat { - pub fn is_valid(&self) -> bool { - self.validity == proto::HeartbeatValidity::Valid - } - - pub fn truncated_timestamp(&self) -> Result, RoundingError> { - self.timestamp.duration_trunc(Duration::hours(1)) - } - - pub async fn validate_heartbeats<'a>( - gateway_client: &'a GatewayClient, - heartbeats: impl Stream + 'a, - epoch: &'a Range>, - ) -> impl Stream> + 'a { - heartbeats.then(move |heartbeat_report| { - let mut gateway_client = gateway_client.clone(); - async move { - let (cell_type, validity) = - validate_heartbeat(&heartbeat_report, &mut gateway_client, epoch).await?; - Ok(Heartbeat { - hotspot_key: heartbeat_report.report.pubkey, - cbsd_id: heartbeat_report.report.cbsd_id, - timestamp: heartbeat_report.received_timestamp, - cell_type, - validity, - }) - } - }) - } - - pub async fn write(&self, heartbeats: &FileSinkClient) -> file_store::Result { - heartbeats - .write( - proto::Heartbeat { - cbsd_id: self.cbsd_id.clone(), - pub_key: self.hotspot_key.clone().into(), - reward_multiplier: self - .cell_type - .map_or(0.0, |ct| ct.reward_weight().to_f32().unwrap_or(0.0)), - cell_type: self.cell_type.unwrap_or(CellType::Neutrino430) as i32, // Is this the right default? - validity: self.validity as i32, - timestamp: self.timestamp.timestamp() as u64, - coverage_object: Vec::with_capacity(0), // Placeholder so the project compiles - lat: 0.0, - lon: 0.0, - }, - &[("validity", self.validity.as_str_name())], - ) - .await?; - Ok(()) - } - - pub async fn save( - self, - exec: &mut Transaction<'_, Postgres>, - ) -> Result { - let truncated_timestamp = self.truncated_timestamp()?; - Ok( - sqlx::query_as::<_, HeartbeatSaveResult>( - r#" - INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (cbsd_id, truncated_timestamp) DO UPDATE SET - latest_timestamp = EXCLUDED.latest_timestamp - RETURNING (xmax = 0) as inserted - "# - ) - .bind(self.cbsd_id) - .bind(self.hotspot_key) - .bind(self.cell_type.unwrap()) - .bind(self.timestamp) - .bind(truncated_timestamp) - .fetch_one(&mut *exec) - .await? - .inserted - ) - } -} - -/// Validate a heartbeat in the given epoch. -async fn validate_heartbeat( - heartbeat: &CellHeartbeatIngestReport, - gateway_client: &mut GatewayClient, - epoch: &Range>, -) -> Result<(Option, proto::HeartbeatValidity), ClientError> { - let cell_type = match CellType::from_cbsd_id(&heartbeat.report.cbsd_id) { - Some(ty) => Some(ty), - _ => return Ok((None, proto::HeartbeatValidity::BadCbsdId)), - }; - - if !heartbeat.report.operation_mode { - return Ok((cell_type, proto::HeartbeatValidity::NotOperational)); - } - - if !epoch.contains(&heartbeat.received_timestamp) { - return Ok((cell_type, proto::HeartbeatValidity::HeartbeatOutsideRange)); - } - - if gateway_client - .resolve_gateway_info(&heartbeat.report.pubkey) - .await? - .is_none() - { - return Ok((cell_type, proto::HeartbeatValidity::GatewayOwnerNotFound)); - } - - Ok((cell_type, proto::HeartbeatValidity::Valid)) -} - -pub async fn clear_heartbeats( - tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, - timestamp: &DateTime, -) -> Result<(), sqlx::Error> { - sqlx::query("DELETE FROM heartbeats WHERE truncated_timestamp < $1") - .bind(timestamp) - .execute(&mut *tx) - .await?; - Ok(()) -} diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs new file mode 100644 index 000000000..f63f7338e --- /dev/null +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -0,0 +1,91 @@ +use super::{process_heartbeat_stream, Heartbeat}; + +use chrono::{DateTime, Duration, Utc}; +use file_store::{ + file_info_poller::FileInfoStream, file_sink::FileSinkClient, + heartbeat::CbrsHeartbeatIngestReport, +}; +use futures::{stream::StreamExt, TryFutureExt}; +use mobile_config::GatewayClient; +use retainer::Cache; + +use std::{sync::Arc, time}; +use tokio::sync::mpsc::Receiver; + +pub struct HeartbeatDaemon { + pool: sqlx::Pool, + gateway_client: GatewayClient, + heartbeats: Receiver>, + file_sink: FileSinkClient, +} + +impl HeartbeatDaemon { + pub fn new( + pool: sqlx::Pool, + gateway_client: GatewayClient, + heartbeats: Receiver>, + file_sink: FileSinkClient, + ) -> Self { + Self { + pool, + gateway_client, + heartbeats, + file_sink, + } + } + + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tokio::spawn(async move { + tracing::info!("Starting CBRS HeartbeatDaemon"); + let cache = Arc::new(Cache::<(String, DateTime), ()>::new()); + + let cache_clone = cache.clone(); + tokio::spawn(async move { + cache_clone + .monitor(4, 0.25, time::Duration::from_secs(60 * 60 * 3)) + .await + }); + + loop { + tokio::select! { + biased; + _ = shutdown.clone() => { + tracing::info!("CBRS HeartbeatDaemon shutting down"); + break; + } + Some(file) = self.heartbeats.recv() => self.process_file(file, &cache).await?, + } + } + + Ok(()) + }) + .map_err(anyhow::Error::from) + .and_then(|result| async move { result }) + .await + } + + async fn process_file( + &self, + file: FileInfoStream, + cache: &Cache<(String, DateTime), ()>, + ) -> anyhow::Result<()> { + tracing::info!("Processing CBRS heartbeat file {}", file.file_info.key); + let mut transaction = self.pool.begin().await?; + let epoch = (file.file_info.timestamp - Duration::hours(3)) + ..(file.file_info.timestamp + Duration::minutes(30)); + // map the ingest reports to our generic heartbeat type + let reports = file + .into_stream(&mut transaction) + .await? + .map(Heartbeat::from); + process_heartbeat_stream( + reports, + &self.gateway_client, + &self.file_sink, + cache, + transaction, + &epoch, + ) + .await + } +} diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs new file mode 100644 index 000000000..7cb4ab7c0 --- /dev/null +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -0,0 +1,465 @@ +pub mod cbrs; +pub mod wifi; + +use crate::cell_type::{CellType, CellTypeLabel}; +use anyhow::anyhow; +use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc}; +use file_store::{ + file_sink::FileSinkClient, heartbeat::CbrsHeartbeatIngestReport, + wifi_heartbeat::WifiHeartbeatIngestReport, +}; +use futures::stream::{Stream, StreamExt, TryStreamExt}; +use h3o::{CellIndex, LatLng}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile as proto; +use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient}; +use retainer::Cache; +use rust_decimal::{prelude::ToPrimitive, Decimal}; +use sqlx::{Postgres, Transaction}; +use std::{ops::Range, pin::pin, time}; + +/// Minimum number of heartbeats required to give a reward to the hotspot. +const MINIMUM_HEARTBEAT_COUNT: i64 = 12; + +#[derive(Clone, PartialEq)] +pub enum HBType { + Cbrs = 0, + Wifi = 1, +} + +#[derive(Clone)] +pub struct Heartbeat { + hb_type: HBType, + hotspot_key: PublicKeyBinary, + cbsd_id: Option, + operation_mode: bool, + lat: f64, + lon: f64, + location_validation_timestamp: Option>, + timestamp: DateTime, +} + +impl Heartbeat { + pub fn truncated_timestamp(&self) -> Result, RoundingError> { + self.timestamp.duration_trunc(Duration::hours(1)) + } + + pub fn id(&self) -> anyhow::Result<(String, DateTime)> { + let ts = self.truncated_timestamp()?; + match self.hb_type { + HBType::Cbrs => { + let cbsd_id = self + .cbsd_id + .clone() + .ok_or_else(|| anyhow!("expected cbsd_id, found none"))?; + Ok((cbsd_id, ts)) + } + HBType::Wifi => Ok((self.hotspot_key.to_string(), ts)), + } + } + + pub fn asserted_distance(&self, asserted_location: u64) -> anyhow::Result { + let asserted_latlng: LatLng = CellIndex::try_from(asserted_location)?.into(); + let hb_latlng = LatLng::new(self.lat, self.lon)?; + Ok(asserted_latlng.distance_m(hb_latlng).round() as i64) + } +} + +impl From for Heartbeat { + fn from(value: CbrsHeartbeatIngestReport) -> Self { + Self { + hb_type: HBType::Cbrs, + hotspot_key: value.report.pubkey, + cbsd_id: Some(value.report.cbsd_id), + operation_mode: value.report.operation_mode, + lat: value.report.lat, + lon: value.report.lon, + location_validation_timestamp: None, + timestamp: value.received_timestamp, + } + } +} + +impl From for Heartbeat { + fn from(value: WifiHeartbeatIngestReport) -> Self { + Self { + hb_type: HBType::Wifi, + hotspot_key: value.report.pubkey, + cbsd_id: None, + operation_mode: value.report.operation_mode, + lat: value.report.lat, + lon: value.report.lon, + location_validation_timestamp: value.report.location_validation_timestamp, + timestamp: value.received_timestamp, + } + } +} + +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct HeartbeatRow { + pub hotspot_key: PublicKeyBinary, + // cell hb only + pub cbsd_id: Option, + pub cell_type: CellType, + // wifi hb only + pub location_validation_timestamp: Option>, + pub distance_to_asserted: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct HeartbeatReward { + pub hotspot_key: PublicKeyBinary, + pub cell_type: CellType, + // cell hb only + pub cbsd_id: Option, + pub reward_weight: Decimal, +} + +impl HeartbeatReward { + pub fn id(&self) -> anyhow::Result { + match self.cell_type.to_label() { + CellTypeLabel::CBRS => Ok(self + .cbsd_id + .clone() + .ok_or_else(|| anyhow!("expected cbsd_id, found none"))?), + CellTypeLabel::Wifi => Ok(self.hotspot_key.to_string()), + _ => Err(anyhow!("failed to derive label from cell type")), + } + } + + pub fn reward_weight(&self, reward_wifi_hbs: bool) -> Decimal { + if !reward_wifi_hbs && self.cell_type.to_label() == CellTypeLabel::Wifi { + return Decimal::ZERO; + } + self.reward_weight + } + + pub fn validated<'a>( + exec: impl sqlx::PgExecutor<'a> + Copy + 'a, + epoch: &'a Range>, + max_distance_to_asserted: u32, + ) -> impl Stream> + 'a { + sqlx::query_as::<_, HeartbeatRow>( + r#" + (WITH latest_hotspots AS ( + SELECT t1.cbsd_id, t1.hotspot_key, t1.latest_timestamp + FROM cbrs_heartbeats t1 + WHERE t1.latest_timestamp = ( + SELECT MAX(t2.latest_timestamp) + FROM cbrs_heartbeats t2 + WHERE t2.cbsd_id = t1.cbsd_id + AND truncated_timestamp >= $1 + AND truncated_timestamp < $2 + ) + ) + SELECT + latest_hotspots.hotspot_key, + cbrs_heartbeats.cbsd_id, + cell_type, + NULL as location_validation_timestamp, + NULL as distance_to_asserted + FROM cbrs_heartbeats + JOIN latest_hotspots ON cbrs_heartbeats.cbsd_id = latest_hotspots.cbsd_id + WHERE truncated_timestamp >= $1 + AND truncated_timestamp < $2 + GROUP BY + cbrs_heartbeats.cbsd_id, + latest_hotspots.hotspot_key, + cell_type + HAVING count(*) >= $3) + UNION + SELECT + grouped.hotspot_key, + NULL as cbsd_id, + grouped.cell_type, + b.location_validation_timestamp, + b.distance_to_asserted + FROM + ( + SELECT + hotspot_key, + cell_type + FROM wifi_heartbeats + WHERE truncated_timestamp >= $1 + AND truncated_timestamp < $2 + GROUP BY hotspot_key, cell_type + HAVING count(*) >= $3 + ) as grouped + LEFT JOIN ( + select hotspot_key, + location_validation_timestamp, + distance_to_asserted + from wifi_heartbeats + WHERE wifi_heartbeats.truncated_timestamp >= $1 + AND wifi_heartbeats.truncated_timestamp < $2 + ) as b on b.hotspot_key = grouped.hotspot_key + "#, + ) + .bind(epoch.start) + .bind(epoch.end) + .bind(MINIMUM_HEARTBEAT_COUNT) + .fetch(exec) + .map_ok(move |row| Self::from_heartbeat_row(row, max_distance_to_asserted)) + } + + pub fn from_heartbeat_row(value: HeartbeatRow, max_distance_to_asserted: u32) -> Self { + Self { + hotspot_key: value.hotspot_key, + cell_type: value.cell_type, + cbsd_id: value.cbsd_id, + reward_weight: value.cell_type.reward_weight() + * value.cell_type.location_weight( + value.location_validation_timestamp, + value.distance_to_asserted, + max_distance_to_asserted, + ), + } + } +} + +#[derive(sqlx::FromRow)] +pub struct HeartbeatSaveResult { + inserted: bool, +} + +#[derive(Clone)] +pub struct ValidatedHeartbeat { + pub report: Heartbeat, + cell_type: CellType, + validity: proto::HeartbeatValidity, + distance_to_asserted: Option, +} + +impl ValidatedHeartbeat { + pub fn is_valid(&self) -> bool { + self.validity == proto::HeartbeatValidity::Valid + } + + pub fn truncated_timestamp(&self) -> Result, RoundingError> { + self.report.timestamp.duration_trunc(Duration::hours(1)) + } + + pub async fn validate_heartbeats<'a>( + gateway_client: &'a GatewayClient, + heartbeats: impl Stream + 'a, + epoch: &'a Range>, + ) -> impl Stream> + 'a { + heartbeats.then(move |report| { + let mut gateway_client = gateway_client.clone(); + async move { + let (cell_type, validity, distance_to_asserted) = + validate_heartbeat(&report, &mut gateway_client, epoch).await?; + + Ok(Self { + report, + cell_type, + validity, + distance_to_asserted, + }) + } + }) + } + + pub async fn write(&self, heartbeats: &FileSinkClient) -> file_store::Result { + heartbeats + .write( + proto::Heartbeat { + cbsd_id: self.report.cbsd_id.clone().unwrap_or_default(), + pub_key: self.report.hotspot_key.as_ref().into(), + reward_multiplier: self.cell_type.reward_weight().to_f32().unwrap_or_default(), + cell_type: self.cell_type as i32, + validity: self.validity as i32, + timestamp: self.report.timestamp.timestamp() as u64, + coverage_object: Vec::with_capacity(0), // Placeholder so the project compiles + lat: 0.0, + lon: 0.0, + location_validation_timestamp: self + .report + .location_validation_timestamp + .map_or(0, |v| v.timestamp() as u64), + distance_to_asserted: self.distance_to_asserted.map_or(0, |v| v as u64), + }, + &[("validity", self.validity.as_str_name())], + ) + .await?; + Ok(()) + } + + pub async fn save(self, exec: &mut Transaction<'_, Postgres>) -> anyhow::Result { + match self.report.hb_type { + HBType::Cbrs => self.save_cbrs_hb(exec).await, + HBType::Wifi => self.save_wifi_hb(exec).await, + } + } + + async fn save_cbrs_hb(self, exec: &mut Transaction<'_, Postgres>) -> anyhow::Result { + let cbsd_id = self + .report + .cbsd_id + .as_ref() + .ok_or_else(|| anyhow!("failed to save cbrs heartbeat, invalid cbsd_id"))?; + + let truncated_timestamp = self.truncated_timestamp()?; + Ok( + sqlx::query_as::<_, HeartbeatSaveResult>( + r#" + INSERT INTO cbrs_heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (cbsd_id, truncated_timestamp) DO UPDATE SET + latest_timestamp = EXCLUDED.latest_timestamp + RETURNING (xmax = 0) as inserted + "# + ) + .bind(cbsd_id) + .bind(self.report.hotspot_key) + .bind(self.cell_type) + .bind(self.report.timestamp) + .bind(truncated_timestamp) + .fetch_one(&mut *exec) + .await? + .inserted + ) + } + + async fn save_wifi_hb(self, exec: &mut Transaction<'_, Postgres>) -> anyhow::Result { + let truncated_timestamp = self.truncated_timestamp()?; + Ok(sqlx::query_as::<_, HeartbeatSaveResult>( + r#" + INSERT INTO wifi_heartbeats (hotspot_key, cell_type, location_validation_timestamp, distance_to_asserted, + latest_timestamp, truncated_timestamp) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (hotspot_key, truncated_timestamp) DO UPDATE SET + latest_timestamp = EXCLUDED.latest_timestamp + RETURNING (xmax = 0) as inserted + "#, + ) + .bind(self.report.hotspot_key) + .bind(self.cell_type) + .bind(self.report.location_validation_timestamp) + .bind(self.distance_to_asserted) + .bind(self.report.timestamp) + .bind(truncated_timestamp) + .fetch_one(&mut *exec) + .await? + .inserted) + } +} + +/// Validate a heartbeat in the given epoch. +pub async fn validate_heartbeat( + heartbeat: &Heartbeat, + gateway_client: &mut GatewayClient, + epoch: &Range>, +) -> anyhow::Result<(CellType, proto::HeartbeatValidity, Option)> { + let cell_type = match heartbeat.hb_type { + HBType::Cbrs => match heartbeat.cbsd_id.as_ref() { + Some(cbsd_id) => match CellType::from_cbsd_id(cbsd_id) { + Some(ty) => ty, + _ => { + return Ok(( + CellType::CellTypeNone, + proto::HeartbeatValidity::BadCbsdId, + None, + )) + } + }, + None => { + return Ok(( + CellType::CellTypeNone, + proto::HeartbeatValidity::BadCbsdId, + None, + )) + } + }, + // for wifi HBs temporary assume we have an indoor wifi spot + // this will be better/properly handled when coverage reports are live + HBType::Wifi => CellType::NovaGenericWifiIndoor, + }; + + if !heartbeat.operation_mode { + return Ok((cell_type, proto::HeartbeatValidity::NotOperational, None)); + } + + if !epoch.contains(&heartbeat.timestamp) { + return Ok(( + cell_type, + proto::HeartbeatValidity::HeartbeatOutsideRange, + None, + )); + } + + let Some(gateway_info) = gateway_client + .resolve_gateway_info(&heartbeat.hotspot_key) + .await? + else { + return Ok((cell_type, proto::HeartbeatValidity::GatewayNotFound, None)); + }; + + let Some(metadata) = gateway_info.metadata else { + return Ok(( + cell_type, + proto::HeartbeatValidity::GatewayNotAsserted, + None, + )); + }; + + let distance_to_asserted = if heartbeat.hb_type == HBType::Wifi { + Some(heartbeat.asserted_distance(metadata.location)?) + } else { + None + }; + + Ok(( + cell_type, + proto::HeartbeatValidity::Valid, + distance_to_asserted, + )) +} + +pub(crate) async fn process_heartbeat_stream<'a>( + reports: impl Stream + 'a, + gateway_client: &'a GatewayClient, + file_sink: &FileSinkClient, + cache: &Cache<(String, DateTime), ()>, + mut transaction: Transaction<'_, Postgres>, + epoch: &'a Range>, +) -> anyhow::Result<()> { + let mut validated_heartbeats = + pin!(ValidatedHeartbeat::validate_heartbeats(gateway_client, reports, epoch).await); + + while let Some(validated_heartbeat) = validated_heartbeats.next().await.transpose()? { + validated_heartbeat.write(file_sink).await?; + + if !validated_heartbeat.is_valid() { + continue; + } + + let key = validated_heartbeat.report.id()?; + if cache.get(&key).await.is_none() { + validated_heartbeat.save(&mut transaction).await?; + cache + .insert(key, (), time::Duration::from_secs(60 * 60 * 2)) + .await; + } + } + file_sink.commit().await?; + transaction.commit().await?; + Ok(()) +} + +pub async fn clear_heartbeats( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + timestamp: &DateTime, +) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM cbrs_heartbeats WHERE truncated_timestamp < $1;") + .bind(timestamp) + .execute(&mut *tx) + .await?; + + sqlx::query("DELETE FROM wifi_heartbeats WHERE truncated_timestamp < $1;") + .bind(timestamp) + .execute(&mut *tx) + .await?; + + Ok(()) +} diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs new file mode 100644 index 000000000..f5e4b1f1d --- /dev/null +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -0,0 +1,90 @@ +use super::{process_heartbeat_stream, Heartbeat}; +use chrono::{DateTime, Duration, Utc}; +use file_store::{ + file_info_poller::FileInfoStream, file_sink::FileSinkClient, + wifi_heartbeat::WifiHeartbeatIngestReport, +}; +use futures::{stream::StreamExt, TryFutureExt}; +use mobile_config::GatewayClient; +use retainer::Cache; + +use std::{sync::Arc, time}; +use tokio::sync::mpsc::Receiver; + +pub struct HeartbeatDaemon { + pool: sqlx::Pool, + gateway_client: GatewayClient, + heartbeats: Receiver>, + file_sink: FileSinkClient, +} + +impl HeartbeatDaemon { + pub fn new( + pool: sqlx::Pool, + gateway_client: GatewayClient, + heartbeats: Receiver>, + file_sink: FileSinkClient, + ) -> Self { + Self { + pool, + gateway_client, + heartbeats, + file_sink, + } + } + + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tokio::spawn(async move { + tracing::info!("Starting Wifi HeartbeatDaemon"); + let cache = Arc::new(Cache::<(String, DateTime), ()>::new()); + + let cache_clone = cache.clone(); + tokio::spawn(async move { + cache_clone + .monitor(4, 0.25, time::Duration::from_secs(60 * 60 * 3)) + .await + }); + + loop { + tokio::select! { + biased; + _ = shutdown.clone() => { + tracing::info!("Wifi HeartbeatDaemon shutting down"); + break; + } + Some(file) = self.heartbeats.recv() => self.process_file(file, &cache).await?, + } + } + + Ok(()) + }) + .map_err(anyhow::Error::from) + .and_then(|result| async move { result }) + .await + } + + async fn process_file( + &self, + file: FileInfoStream, + cache: &Cache<(String, DateTime), ()>, + ) -> anyhow::Result<()> { + tracing::info!("Processing WIFI heartbeat file {}", file.file_info.key); + let mut transaction = self.pool.begin().await?; + let epoch = (file.file_info.timestamp - Duration::hours(3)) + ..(file.file_info.timestamp + Duration::minutes(30)); + // map the ingest reports to our generic heartbeat type + let reports = file + .into_stream(&mut transaction) + .await? + .map(Heartbeat::from); + process_heartbeat_stream( + reports, + &self.gateway_client, + &self.file_sink, + cache, + transaction, + &epoch, + ) + .await + } +} diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index f6d65df6b..5c6a74c34 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -1,14 +1,13 @@ -mod cell_type; +pub mod cell_type; +pub mod cli; mod data_session; pub mod heartbeats; mod reward_shares; +pub mod rewarder; mod settings; mod speedtests; mod speedtests_average; mod subscriber_location; mod telemetry; -pub mod cli; -pub mod rewarder; - pub use settings::Settings; diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index 24a7887fe..821686a59 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -31,7 +31,7 @@ const DEFAULT_PREC: u32 = 15; // Percent of total emissions allocated for mapper rewards const MAPPERS_REWARDS_PERCENT: Decimal = dec!(0.2); -/// shares of the mappers pool allocated per eligble subscriber for discovery mapping +/// shares of the mappers pool allocated per eligible subscriber for discovery mapping const DISCOVERY_MAPPING_SHARES: Decimal = dec!(30); pub struct TransferRewards { @@ -155,12 +155,12 @@ impl MapperShares { let duration: Duration = reward_period.end - reward_period.start; let total_mappers_pool = get_scheduled_tokens_for_mappers(duration); - // the number of subscribers eligible for discovery location rewards hihofe + // the number of subscribers eligible for discovery location rewards let discovery_mappers_count = Decimal::from(self.discovery_mapping_shares.len()); // calculate the total eligible mapping shares for the epoch // this could be simplified as every subscriber is awarded the same share - // however the fuction is setup to allow the verification mapper shares to be easily + // however the function is setup to allow the verification mapper shares to be easily // added without impacting code structure ( the per share value for those will be different ) let total_mapper_shares = discovery_mappers_count * DISCOVERY_MAPPING_SHARES; let res = total_mappers_pool @@ -201,7 +201,7 @@ pub fn dc_to_mobile_bones(dc_amount: Decimal, mobile_bone_price: Decimal) -> Dec #[derive(Default)] pub struct RadioShares { - radio_shares: HashMap, + radio_shares: HashMap, Decimal>, } impl RadioShares { @@ -219,23 +219,24 @@ pub struct PocShares { impl PocShares { pub async fn aggregate( - heartbeats: impl Stream>, + heartbeat_rewards: impl Stream>, speedtest_averages: &SpeedtestAverages, - ) -> Result { + reward_wifi_hbs: bool, + ) -> anyhow::Result { let mut poc_shares = Self::default(); - let mut heartbeats = std::pin::pin!(heartbeats); - while let Some(heartbeat) = heartbeats.next().await.transpose()? { + let mut heartbeat_rewards = std::pin::pin!(heartbeat_rewards); + while let Some(heartbeat_reward) = heartbeat_rewards.next().await.transpose()? { let speedmultiplier = speedtest_averages - .get_average(&heartbeat.hotspot_key) + .get_average(&heartbeat_reward.hotspot_key) .as_ref() .map_or(Decimal::ZERO, SpeedtestAverage::reward_multiplier); *poc_shares .hotspot_shares - .entry(heartbeat.hotspot_key) + .entry(heartbeat_reward.hotspot_key.clone()) .or_default() .radio_shares - .entry(heartbeat.cbsd_id) - .or_default() += heartbeat.reward_weight * speedmultiplier; + .entry(heartbeat_reward.cbsd_id.clone()) + .or_default() += heartbeat_reward.reward_weight(reward_wifi_hbs) * speedmultiplier; } Ok(poc_shares) } @@ -280,7 +281,7 @@ impl PocShares { reward: Some(proto::mobile_reward_share::Reward::RadioReward( proto::RadioReward { hotspot_key, - cbsd_id, + cbsd_id: cbsd_id.unwrap_or_default(), poc_reward: poc_reward .round_dp_with_strategy(0, RoundingStrategy::ToZero) .to_u64() @@ -321,8 +322,11 @@ pub fn get_scheduled_tokens_for_mappers(duration: Duration) -> Decimal { mod test { use super::*; use crate::{ - cell_type::CellType, data_session, data_session::HotspotDataSession, - heartbeats::HeartbeatReward, speedtests::Speedtest, + cell_type::CellType, + data_session, + data_session::HotspotDataSession, + heartbeats::{HeartbeatReward, HeartbeatRow}, + speedtests::Speedtest, subscriber_location::SubscriberValidatedLocations, }; use chrono::{Duration, Utc}; @@ -333,8 +337,8 @@ mod test { use std::collections::HashMap; fn valid_shares() -> RadioShares { - let mut radio_shares: HashMap = Default::default(); - radio_shares.insert(String::new(), Decimal::ONE); + let mut radio_shares: HashMap, Decimal> = Default::default(); + radio_shares.insert(Some(String::new()), Decimal::ONE); RadioShares { radio_shares } } @@ -527,12 +531,6 @@ mod test { mbps * 125000 } - fn cell_type_weight(cbsd_id: &str) -> Decimal { - CellType::from_cbsd_id(cbsd_id) - .expect("unable to get cell_type") - .reward_weight() - } - fn acceptable_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { report: CellSpeedtest { @@ -593,34 +591,84 @@ mod test { let g2: PublicKeyBinary = "118SPA16MX8WrUKcuXxsg6SH8u5dWszAySiUAJX6tTVoQVy7nWc" .parse() .expect("unable to construct pubkey"); + let g3: PublicKeyBinary = "112bUuQaE7j73THS9ABShHGokm46Miip9L361FSyWv7zSYn8hZWf" + .parse() + .expect("unable to construct pubkey"); + let g4: PublicKeyBinary = "11z69eJ3czc92k6snrfR9ek7g2uRWXosFbnG9v4bXgwhfUCivUo" + .parse() + .expect("unable to construct pubkey"); + let g5: PublicKeyBinary = "113HRxtzxFbFUjDEJJpyeMRZRtdAW38LAUnB5mshRwi6jt7uFbt" + .parse() + .expect("unable to construct pubkey"); + + let max_asserted_distance_deviation: u32 = 300; let c1 = "P27-SCE4255W2107CW5000014".to_string(); let c2 = "2AG32PBS3101S1202000464223GY0153".to_string(); + let c1ct = CellType::from_cbsd_id(&c1).expect("unable to get cell_type"); + let c2ct = CellType::from_cbsd_id(&c2).expect("unable to get cell_type"); + + let g3ct = CellType::NovaGenericWifiIndoor; + let g4ct = CellType::NovaGenericWifiIndoor; + let g5ct = CellType::NovaGenericWifiIndoor; let timestamp = Utc::now(); - let heartbeats = vec![ - HeartbeatReward { - cbsd_id: c1.clone(), + let heartbeat_keys = vec![ + HeartbeatRow { + cbsd_id: Some(c1.clone()), hotspot_key: g1.clone(), - reward_weight: cell_type_weight(&c1), + cell_type: c1ct, + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c2.clone(), + HeartbeatRow { + cbsd_id: Some(c2.clone()), hotspot_key: g1.clone(), - reward_weight: cell_type_weight(&c2), + cell_type: c2ct, + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c1.clone(), + HeartbeatRow { + cbsd_id: Some(c1.clone()), hotspot_key: g2.clone(), - reward_weight: cell_type_weight(&c1), + cell_type: c1ct, + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c1.clone(), + HeartbeatRow { + cbsd_id: Some(c1.clone()), hotspot_key: g2.clone(), - reward_weight: cell_type_weight(&c1), + cell_type: c1ct, + location_validation_timestamp: None, + distance_to_asserted: Some(1), + }, + HeartbeatRow { + cbsd_id: None, + hotspot_key: g3.clone(), + cell_type: g3ct, + location_validation_timestamp: Some(timestamp), + distance_to_asserted: Some(1), + }, + HeartbeatRow { + cbsd_id: None, + hotspot_key: g4.clone(), + cell_type: g4ct, + location_validation_timestamp: None, + distance_to_asserted: Some(1), + }, + HeartbeatRow { + cbsd_id: None, + hotspot_key: g5.clone(), + cell_type: g5ct, + location_validation_timestamp: Some(timestamp), + distance_to_asserted: Some(100000), }, ]; + let heartbeat_rewards: Vec = heartbeat_keys + .into_iter() + .map(|row| HeartbeatReward::from_heartbeat_row(row, max_asserted_distance_deviation)) + .collect(); let last_timestamp = timestamp - Duration::hours(12); let g1_speedtests = vec![ @@ -631,30 +679,80 @@ mod test { acceptable_speedtest(g2.clone(), last_timestamp), acceptable_speedtest(g2.clone(), timestamp), ]; + let g3_speedtests = vec![ + acceptable_speedtest(g3.clone(), last_timestamp), + acceptable_speedtest(g3.clone(), timestamp), + ]; + let g4_speedtests = vec![ + acceptable_speedtest(g4.clone(), last_timestamp), + acceptable_speedtest(g4.clone(), timestamp), + ]; + let g5_speedtests = vec![ + acceptable_speedtest(g5.clone(), last_timestamp), + acceptable_speedtest(g5.clone(), timestamp), + ]; let g1_average = SpeedtestAverage::from(&g1_speedtests); let g2_average = SpeedtestAverage::from(&g2_speedtests); + let g3_average = SpeedtestAverage::from(&g3_speedtests); + let g4_average = SpeedtestAverage::from(&g4_speedtests); + let g5_average = SpeedtestAverage::from(&g5_speedtests); let mut averages = HashMap::new(); averages.insert(g1.clone(), g1_average); averages.insert(g2.clone(), g2_average); + averages.insert(g3.clone(), g3_average); + averages.insert(g4.clone(), g4_average); + averages.insert(g5.clone(), g5_average); let speedtest_avgs = SpeedtestAverages { averages }; - let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs) - .await - .unwrap(); + let rewards = PocShares::aggregate( + stream::iter(heartbeat_rewards).map(Ok), + &speedtest_avgs, + true, + ) + .await + .unwrap(); + + let gw1_shares = rewards + .hotspot_shares + .get(&g1) + .expect("Could not fetch gateway1 shares") + .total_shares(); + let gw2_shares = rewards + .hotspot_shares + .get(&g2) + .expect("Could not fetch gateway1 shares") + .total_shares(); + let gw3_shares = rewards + .hotspot_shares + .get(&g3) + .expect("Could not fetch gateway3 shares") + .total_shares(); + let gw4_shares = rewards + .hotspot_shares + .get(&g4) + .expect("Could not fetch gateway4 shares") + .total_shares(); + let gw5_shares = rewards + .hotspot_shares + .get(&g5) + .expect("Could not fetch gateway5 shares") + .total_shares(); // The owner with two hotspots gets more rewards - assert!( - rewards - .hotspot_shares - .get(&g1) - .expect("Could not fetch gateway1 shares") - .total_shares() - > rewards - .hotspot_shares - .get(&g2) - .expect("Could not fetch gateway2 shares") - .total_shares() - ); + assert_eq!(gw1_shares, dec!(3.50)); + assert_eq!(gw2_shares, dec!(2.00)); + assert!(gw1_shares > gw2_shares); + + // gw3 has wifi HBs and has location validation timestamp + // gets the full 0.4 reward weight + assert_eq!(gw3_shares, dec!(0.40)); + // gw4 has wifi HBs and DOES NOT have a location validation timestamp + // gets 0.25 of the full reward weight + assert_eq!(gw4_shares, dec!(0.1)); + // gw4 has wifi HBs and does have a location validation timestamp + // but the HB distance is too far from the asserted location + // gets 0.25 of the full reward weight + assert_eq!(gw5_shares, dec!(0.1)); } #[tokio::test] @@ -672,6 +770,15 @@ mod test { let owner4: PublicKeyBinary = "112p1GbUtRLyfFaJr1XF8fH7yz9cSZ4exbrSpVDeu67DeGb31QUL" .parse() .expect("failed owner4 parse"); + let owner5: PublicKeyBinary = "112bUGwooPd1dCDd3h3yZwskjxCzBsQNKeaJTuUF4hSgYedcsFa9" + .parse() + .expect("failed owner5 parse"); + let owner6: PublicKeyBinary = "112WqD16uH8GLmCMhyRUrp6Rw5MTELzBdx7pSepySYUoSjixQoxJ" + .parse() + .expect("failed owner6 parse"); + let owner7: PublicKeyBinary = "112WnYhq4qX3wdw6JTZT3w3A9FNGxeescJwJffcBN5jiZvovWRkQ" + .parse() + .expect("failed owner7 parse"); // init hotspots let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" @@ -698,6 +805,16 @@ mod test { let gw8: PublicKeyBinary = "112qDCKek7fePg6wTpEnbLp3uD7TTn8MBH7PGKtmAaUcG1vKQ9eZ" .parse() .expect("failed gw8 parse"); + // include a couple of wifi spots in the mix + let gw9: PublicKeyBinary = "112bUuQaE7j73THS9ABShHGokm46Miip9L361FSyWv7zSYn8hZWf" + .parse() + .expect("failed gw9 parse"); + let gw10: PublicKeyBinary = "11z69eJ3czc92k6snrfR9ek7g2uRWXosFbnG9v4bXgwhfUCivUo" + .parse() + .expect("failed gw10 parse"); + let gw11: PublicKeyBinary = "112WnYhq4qX3wdw6JTZT3w3A9FNGxeescJwJffcBN5jiZvovWRkQ" + .parse() + .expect("failed gw11 parse"); // link gws to owners let mut owners = HashMap::new(); @@ -709,6 +826,9 @@ mod test { owners.insert(gw6.clone(), owner3.clone()); owners.insert(gw7.clone(), owner3.clone()); owners.insert(gw8.clone(), owner4.clone()); + owners.insert(gw9.clone(), owner5.clone()); + owners.insert(gw10.clone(), owner6.clone()); + owners.insert(gw11.clone(), owner7.clone()); // init cells and cell_types let c2 = "P27-SCE4255W2107CW5000015".to_string(); @@ -726,71 +846,122 @@ mod test { let now = Utc::now(); let timestamp = now - Duration::minutes(20); + let max_asserted_distance_deviation: u32 = 300; // setup heartbeats - let heartbeats = vec![ - HeartbeatReward { - cbsd_id: c2.clone(), + let heartbeat_keys = vec![ + HeartbeatRow { + cbsd_id: Some(c2.clone()), hotspot_key: gw2.clone(), - reward_weight: cell_type_weight(&c2), + cell_type: CellType::from_cbsd_id(&c2).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c4.clone(), + HeartbeatRow { + cbsd_id: Some(c4.clone()), hotspot_key: gw3.clone(), - reward_weight: cell_type_weight(&c4), + cell_type: CellType::from_cbsd_id(&c4).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c5.clone(), + HeartbeatRow { + cbsd_id: Some(c5.clone()), hotspot_key: gw4.clone(), - reward_weight: cell_type_weight(&c5), + cell_type: CellType::from_cbsd_id(&c5).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c6.clone(), + HeartbeatRow { + cbsd_id: Some(c6.clone()), hotspot_key: gw4.clone(), - reward_weight: cell_type_weight(&c6), + cell_type: CellType::from_cbsd_id(&c6).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c7.clone(), + HeartbeatRow { + cbsd_id: Some(c7.clone()), hotspot_key: gw4.clone(), - reward_weight: cell_type_weight(&c7), + cell_type: CellType::from_cbsd_id(&c7).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c8.clone(), + HeartbeatRow { + cbsd_id: Some(c8.clone()), hotspot_key: gw4.clone(), - reward_weight: cell_type_weight(&c8), + cell_type: CellType::from_cbsd_id(&c8).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c9.clone(), + HeartbeatRow { + cbsd_id: Some(c9.clone()), hotspot_key: gw4.clone(), - reward_weight: cell_type_weight(&c9), + cell_type: CellType::from_cbsd_id(&c9).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c10.clone(), + HeartbeatRow { + cbsd_id: Some(c10.clone()), hotspot_key: gw4.clone(), - reward_weight: cell_type_weight(&c10), + cell_type: CellType::from_cbsd_id(&c10).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c11.clone(), + HeartbeatRow { + cbsd_id: Some(c11.clone()), hotspot_key: gw4.clone(), - reward_weight: cell_type_weight(&c11), + cell_type: CellType::from_cbsd_id(&c11).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c12.clone(), + HeartbeatRow { + cbsd_id: Some(c12.clone()), hotspot_key: gw5.clone(), - reward_weight: cell_type_weight(&c12), + cell_type: CellType::from_cbsd_id(&c12).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c13.clone(), + HeartbeatRow { + cbsd_id: Some(c13.clone()), hotspot_key: gw6.clone(), - reward_weight: cell_type_weight(&c13), + cell_type: CellType::from_cbsd_id(&c13).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), }, - HeartbeatReward { - cbsd_id: c14.clone(), + HeartbeatRow { + cbsd_id: Some(c14.clone()), hotspot_key: gw7.clone(), - reward_weight: cell_type_weight(&c14), + cell_type: CellType::from_cbsd_id(&c14).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), + }, + HeartbeatRow { + cbsd_id: None, + hotspot_key: gw9.clone(), + cell_type: CellType::NovaGenericWifiIndoor, + location_validation_timestamp: Some(timestamp), + distance_to_asserted: Some(1), + }, + HeartbeatRow { + cbsd_id: None, + hotspot_key: gw10.clone(), + cell_type: CellType::NovaGenericWifiIndoor, + location_validation_timestamp: None, + distance_to_asserted: Some(1), + }, + HeartbeatRow { + cbsd_id: None, + hotspot_key: gw11.clone(), + cell_type: CellType::NovaGenericWifiIndoor, + location_validation_timestamp: Some(timestamp), + distance_to_asserted: Some(10000), }, ]; + let heartbeat_rewards: Vec = heartbeat_keys + .into_iter() + .map(|row| HeartbeatReward::from_heartbeat_row(row, max_asserted_distance_deviation)) + .collect(); + // setup speedtests let last_speedtest = timestamp - Duration::hours(12); let gw1_speedtests = vec![ @@ -821,6 +992,18 @@ mod test { poor_speedtest(gw7.clone(), last_speedtest), poor_speedtest(gw7.clone(), timestamp), ]; + let gw9_speedtests = vec![ + acceptable_speedtest(gw9.clone(), last_speedtest), + acceptable_speedtest(gw9.clone(), timestamp), + ]; + let gw10_speedtests = vec![ + acceptable_speedtest(gw10.clone(), last_speedtest), + acceptable_speedtest(gw10.clone(), timestamp), + ]; + let gw11_speedtests = vec![ + acceptable_speedtest(gw11.clone(), last_speedtest), + acceptable_speedtest(gw11.clone(), timestamp), + ]; let gw1_average = SpeedtestAverage::from(&gw1_speedtests); let gw2_average = SpeedtestAverage::from(&gw2_speedtests); @@ -829,6 +1012,9 @@ mod test { let gw5_average = SpeedtestAverage::from(&gw5_speedtests); let gw6_average = SpeedtestAverage::from(&gw6_speedtests); let gw7_average = SpeedtestAverage::from(&gw7_speedtests); + let gw9_average = SpeedtestAverage::from(&gw9_speedtests); + let gw10_average = SpeedtestAverage::from(&gw10_speedtests); + let gw11_average = SpeedtestAverage::from(&gw11_speedtests); let mut averages = HashMap::new(); averages.insert(gw1.clone(), gw1_average); averages.insert(gw2.clone(), gw2_average); @@ -837,17 +1023,25 @@ mod test { averages.insert(gw5.clone(), gw5_average); averages.insert(gw6.clone(), gw6_average); averages.insert(gw7.clone(), gw7_average); + averages.insert(gw9.clone(), gw9_average); + averages.insert(gw10.clone(), gw10_average); + averages.insert(gw11.clone(), gw11_average); let speedtest_avgs = SpeedtestAverages { averages }; // calculate the rewards for the sample group let mut owner_rewards = HashMap::::new(); - let epoch = (now - Duration::hours(1))..now; - for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs) - .await - .unwrap() - .into_rewards(Decimal::ZERO, &epoch) - .unwrap() + let duration = Duration::hours(1); + let epoch = (now - duration)..now; + for mobile_reward in PocShares::aggregate( + stream::iter(heartbeat_rewards).map(Ok), + &speedtest_avgs, + true, + ) + .await + .unwrap() + .into_rewards(Decimal::ZERO, &epoch) + .unwrap() { let radio_reward = match mobile_reward.reward { Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward, @@ -865,29 +1059,301 @@ mod test { *owner_rewards .get(&owner1) .expect("Could not fetch owner1 rewards"), - 490_402_129_746 + 471_075_937_440 ); assert_eq!( *owner_rewards .get(&owner2) .expect("Could not fetch owner2 rewards"), - 1_471_206_389_237 + 1_413_227_812_320 ); assert_eq!( *owner_rewards .get(&owner3) .expect("Could not fetch owner3 rewards"), - 87_571_808_883 + 84_120_703_114 ); assert_eq!(owner_rewards.get(&owner4), None); - let mut total = 0; + let owner5_reward = *owner_rewards + .get(&owner5) + .expect("Could not fetch owner5 rewards"); + assert_eq!(owner5_reward, 53_837_249_993); + + let owner6_reward = *owner_rewards + .get(&owner6) + .expect("Could not fetch owner6 rewards"); + assert_eq!(owner6_reward, 13_459_312_498); + + // confirm owner 6 reward is 0.25 of owner 5's reward + // this is due to owner 6's hotspot not having a validation location timestamp + // and thus its reward scale is reduced + assert_eq!((owner5_reward as f64 * 0.25) as u64, owner6_reward); + + let owner7_reward = *owner_rewards + .get(&owner6) + .expect("Could not fetch owner7 rewards"); + assert_eq!(owner7_reward, 13_459_312_498); + + // confirm owner 7 reward is 0.25 of owner 5's reward + // owner 7's hotspot does have a validation location timestamp + // but its distance beyond the asserted location is too high + // and thus its reward scale is reduced + assert_eq!((owner5_reward as f64 * 0.25) as u64, owner7_reward); + + // total emissions for 1 hour + let expected_total_rewards = get_scheduled_tokens_for_poc_and_dc(duration) + .to_u64() + .unwrap(); + // the emissions actually distributed for the hour + let mut distributed_total_rewards = 0; for val in owner_rewards.values() { - total += *val + distributed_total_rewards += *val + } + assert_eq!(distributed_total_rewards, 2_049_180_327_863); + + let diff = expected_total_rewards - distributed_total_rewards; + // the sum of rewards distributed should not exceed the epoch amount + // but due to rounding whilst going to u64 when computing rewards, + // is permitted to be a few bones less + assert_eq!(diff, 5); + } + + #[tokio::test] + async fn full_wifi_indoor_vs_sercomm_indoor_reward_shares() { + // init owners + let owner1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed owner1 parse"); + let owner2: PublicKeyBinary = "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp" + .parse() + .expect("failed owner2 parse"); + // init hotspots + let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed gw1 parse"); + let gw2: PublicKeyBinary = "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp" + .parse() + .expect("failed gw2 parse"); + // link gws to owners + let mut owners = HashMap::new(); + owners.insert(gw1.clone(), owner1.clone()); + owners.insert(gw2.clone(), owner2.clone()); + + let now = Utc::now(); + let timestamp = now - Duration::minutes(20); + let max_asserted_distance_deviation: u32 = 300; + + // init cells and cell_types + let c2 = "P27-SCE4255W".to_string(); // sercom indoor + + // setup heartbeats + let heartbeat_keys = vec![ + // add wifi indoor HB + HeartbeatRow { + cbsd_id: None, + hotspot_key: gw1.clone(), + cell_type: CellType::NovaGenericWifiIndoor, + location_validation_timestamp: Some(timestamp), + distance_to_asserted: Some(1), + }, + // add sercomm indoor HB + HeartbeatRow { + cbsd_id: Some(c2.clone()), + hotspot_key: gw2.clone(), + cell_type: CellType::from_cbsd_id(&c2).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), + }, + ]; + + let heartbeat_rewards: Vec = heartbeat_keys + .into_iter() + .map(|row| HeartbeatReward::from_heartbeat_row(row, max_asserted_distance_deviation)) + .collect(); + + // setup speedtests + let last_speedtest = timestamp - Duration::hours(12); + let gw1_speedtests = vec![ + acceptable_speedtest(gw1.clone(), last_speedtest), + acceptable_speedtest(gw1.clone(), timestamp), + ]; + let gw2_speedtests = vec![ + acceptable_speedtest(gw2.clone(), last_speedtest), + acceptable_speedtest(gw2.clone(), timestamp), + ]; + + let gw1_average = SpeedtestAverage::from(&gw1_speedtests); + let gw2_average = SpeedtestAverage::from(&gw2_speedtests); + let mut averages = HashMap::new(); + averages.insert(gw1.clone(), gw1_average); + averages.insert(gw2.clone(), gw2_average); + + let speedtest_avgs = SpeedtestAverages { averages }; + + // calculate the rewards for the group + let mut owner_rewards = HashMap::::new(); + let duration = Duration::hours(1); + let epoch = (now - duration)..now; + for mobile_reward in PocShares::aggregate( + stream::iter(heartbeat_rewards).map(Ok), + &speedtest_avgs, + true, + ) + .await + .unwrap() + .into_rewards(Decimal::ZERO, &epoch) + .unwrap() + { + let radio_reward = match mobile_reward.reward { + Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward, + _ => unreachable!(), + }; + let owner = owners + .get(&PublicKeyBinary::from(radio_reward.hotspot_key)) + .expect("Could not find owner") + .clone(); + + *owner_rewards.entry(owner).or_default() += radio_reward.poc_reward; + } + + // wifi + let owner1_reward = *owner_rewards + .get(&owner1) + .expect("Could not fetch owner1 rewards"); + assert_eq!(owner1_reward, 585_480_093_676); + + //sercomm + let owner2_reward = *owner_rewards + .get(&owner2) + .expect("Could not fetch owner2 rewards"); + assert_eq!(owner2_reward, 1_463_700_234_192); + + // confirm owner 1 reward is 0.4 of owner 2's reward + // owner 1 is a wifi indoor with a distance_to_asserted < max + // and so gets the full reward scale of 0.4 + // owner 2 is a cbrs sercomm indoor which has a reward scale of 1.0 + assert_eq!(owner1_reward, (owner2_reward as f64 * 0.4) as u64); + } + + #[tokio::test] + async fn reduced_wifi_indoor_vs_sercomm_indoor_reward_shares() { + // init owners + let owner1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed owner1 parse"); + let owner2: PublicKeyBinary = "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp" + .parse() + .expect("failed owner2 parse"); + // init hotspots + let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed gw1 parse"); + let gw2: PublicKeyBinary = "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp" + .parse() + .expect("failed gw2 parse"); + // link gws to owners + let mut owners = HashMap::new(); + owners.insert(gw1.clone(), owner1.clone()); + owners.insert(gw2.clone(), owner2.clone()); + + let now = Utc::now(); + let timestamp = now - Duration::minutes(20); + let max_asserted_distance_deviation: u32 = 300; + + // init cells and cell_types + let c2 = "P27-SCE4255W".to_string(); // sercom indoor + + // setup heartbeats + let heartbeat_keys = vec![ + // add wifi indoor HB + // with distance to asserted > than max allowed + // this results in reward scale dropping to 0.25 + HeartbeatRow { + cbsd_id: None, + hotspot_key: gw1.clone(), + cell_type: CellType::NovaGenericWifiIndoor, + location_validation_timestamp: Some(timestamp), + distance_to_asserted: Some(1000), + }, + // add sercomm indoor HB + HeartbeatRow { + cbsd_id: Some(c2.clone()), + hotspot_key: gw2.clone(), + cell_type: CellType::from_cbsd_id(&c2).unwrap(), + location_validation_timestamp: None, + distance_to_asserted: Some(1), + }, + ]; + + let heartbeat_rewards: Vec = heartbeat_keys + .into_iter() + .map(|row| HeartbeatReward::from_heartbeat_row(row, max_asserted_distance_deviation)) + .collect(); + + // setup speedtests + let last_speedtest = timestamp - Duration::hours(12); + let gw1_speedtests = vec![ + acceptable_speedtest(gw1.clone(), last_speedtest), + acceptable_speedtest(gw1.clone(), timestamp), + ]; + let gw2_speedtests = vec![ + acceptable_speedtest(gw2.clone(), last_speedtest), + acceptable_speedtest(gw2.clone(), timestamp), + ]; + + let gw1_average = SpeedtestAverage::from(&gw1_speedtests); + let gw2_average = SpeedtestAverage::from(&gw2_speedtests); + let mut averages = HashMap::new(); + averages.insert(gw1.clone(), gw1_average); + averages.insert(gw2.clone(), gw2_average); + + let speedtest_avgs = SpeedtestAverages { averages }; + + // calculate the rewards for the group + let mut owner_rewards = HashMap::::new(); + let duration = Duration::hours(1); + let epoch = (now - duration)..now; + for mobile_reward in PocShares::aggregate( + stream::iter(heartbeat_rewards).map(Ok), + &speedtest_avgs, + true, + ) + .await + .unwrap() + .into_rewards(Decimal::ZERO, &epoch) + .unwrap() + { + let radio_reward = match mobile_reward.reward { + Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward, + _ => unreachable!(), + }; + let owner = owners + .get(&PublicKeyBinary::from(radio_reward.hotspot_key)) + .expect("Could not find owner") + .clone(); + + *owner_rewards.entry(owner).or_default() += radio_reward.poc_reward; } - assert_eq!(total, 2_049_180_327_866); // total emissions for 1 hour + // wifi + let owner1_reward = *owner_rewards + .get(&owner1) + .expect("Could not fetch owner1 rewards"); + assert_eq!(owner1_reward, 186_289_120_715); + + //sercomm + let owner2_reward = *owner_rewards + .get(&owner2) + .expect("Could not fetch owner2 rewards"); + assert_eq!(owner2_reward, 1_862_891_207_153); + + // confirm owner 1 reward is 0.1 of owner 2's reward + // owner 1 is a wifi indoor with a distance_to_asserted > max + // and so gets the reduced reward scale of 0.1 ( radio reward scale of 0.4 * location scale of 0.25) + // owner 2 is a cbrs sercomm indoor which has a reward scale of 1.0 + assert_eq!(owner1_reward, (owner2_reward as f64 * 0.1) as u64); } #[tokio::test] @@ -910,13 +1376,13 @@ mod test { hotspot_shares.insert( gw1.clone(), RadioShares { - radio_shares: vec![(c1, dec!(10.0))].into_iter().collect(), + radio_shares: vec![(Some(c1), dec!(10.0))].into_iter().collect(), }, ); hotspot_shares.insert( gw2, RadioShares { - radio_shares: vec![(c2, dec!(-1.0)), (c3, dec!(0.0))] + radio_shares: vec![(Some(c2), dec!(-1.0)), (Some(c3), dec!(0.0))] .into_iter() .collect(), }, diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 407e96916..9a4d4a50f 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -1,6 +1,6 @@ use crate::{ data_session, - heartbeats::{self, HeartbeatReward}, + heartbeats::{clear_heartbeats, HeartbeatReward}, reward_shares::{MapperShares, PocShares, TransferRewards}, speedtests, speedtests_average::SpeedtestAverages, @@ -28,9 +28,12 @@ pub struct Rewarder { mobile_rewards: FileSinkClient, reward_manifests: FileSinkClient, price_tracker: PriceTracker, + reward_wifi_hbs: bool, + max_distance_to_asserted: u32, } impl Rewarder { + #[allow(clippy::too_many_arguments)] pub fn new( pool: Pool, reward_period_duration: Duration, @@ -38,6 +41,8 @@ impl Rewarder { mobile_rewards: FileSinkClient, reward_manifests: FileSinkClient, price_tracker: PriceTracker, + reward_wifi_hbs: bool, + max_distance_to_asserted: u32, ) -> Self { Self { pool, @@ -46,6 +51,8 @@ impl Rewarder { mobile_rewards, reward_manifests, price_tracker, + reward_wifi_hbs, + max_distance_to_asserted, } } @@ -102,7 +109,7 @@ impl Rewarder { // Check if we have heartbeats and speedtests past the end of the reward period if reward_period.end >= self.disable_complete_data_checks_until().await? { if sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM heartbeats WHERE latest_timestamp >= $1", + "SELECT COUNT(*) FROM cbrs_heartbeats WHERE latest_timestamp >= $1", ) .bind(reward_period.end) .fetch_one(&self.pool) @@ -138,10 +145,12 @@ impl Rewarder { reward_period.end ); - let heartbeats = HeartbeatReward::validated(&self.pool, reward_period); + let heartbeats = + HeartbeatReward::validated(&self.pool, reward_period, self.max_distance_to_asserted); let speedtest_averages = SpeedtestAverages::aggregate_epoch_averages(reward_period.end, &self.pool).await?; - let poc_rewards = PocShares::aggregate(heartbeats, &speedtest_averages).await?; + let poc_rewards = + PocShares::aggregate(heartbeats, &speedtest_averages, self.reward_wifi_hbs).await?; let mobile_price = self .price_tracker .price(&helium_proto::BlockchainTokenTypeV1::Mobile) @@ -215,9 +224,9 @@ impl Rewarder { let mut transaction = self.pool.begin().await?; // clear out the various db tables - heartbeats::clear_heartbeats(&mut transaction, &reward_period.start).await?; + clear_heartbeats(&mut transaction, &reward_period.start).await?; speedtests::clear_speedtests(&mut transaction, &reward_period.start).await?; - data_session::clear_hotspot_data_sessions(&mut transaction, &reward_period.end).await?; + data_session::clear_hotspot_data_sessions(&mut transaction, &reward_period.start).await?; // subscriber_location::clear_location_shares(&mut transaction, &reward_period.end).await?; let next_reward_period = scheduler.next_reward_period(); diff --git a/mobile_verifier/src/settings.rs b/mobile_verifier/src/settings.rs index c506f4cc1..1d69a5d49 100644 --- a/mobile_verifier/src/settings.rs +++ b/mobile_verifier/src/settings.rs @@ -25,12 +25,27 @@ pub struct Settings { pub config_client: mobile_config::ClientSettings, #[serde(default = "default_start_after")] pub start_after: u64, + #[serde(default = "default_reward_wifi_hbs")] + pub reward_wifi_hbs: bool, + // Max distance in meters between the asserted location of a WIFI hotspot + // and the lat/lng defined in a heartbeat + // beyond which its location weight will be reduced + #[serde(default = "default_max_asserted_distance_deviation")] + pub max_asserted_distance_deviation: u32, +} + +pub fn default_max_asserted_distance_deviation() -> u32 { + 100 } pub fn default_log() -> String { "mobile_verifier=debug,poc_store=info".to_string() } +pub fn default_reward_wifi_hbs() -> bool { + false +} + pub fn default_start_after() -> u64 { 0 } diff --git a/mobile_verifier/tests/heartbeats.rs b/mobile_verifier/tests/heartbeats.rs index 5db0eff32..c3dd49718 100644 --- a/mobile_verifier/tests/heartbeats.rs +++ b/mobile_verifier/tests/heartbeats.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Utc}; use futures_util::TryStreamExt; use helium_crypto::PublicKeyBinary; +use mobile_verifier::cell_type::CellType; use mobile_verifier::heartbeats::HeartbeatReward; use rust_decimal::Decimal; use sqlx::PgPool; @@ -9,13 +10,14 @@ use sqlx::PgPool; #[ignore] async fn only_fetch_latest_hotspot(pool: PgPool) -> anyhow::Result<()> { let cbsd_id = "P27-SCE4255W120200039521XGB0103".to_string(); + let cell_type = CellType::from_cbsd_id(&cbsd_id).expect("unable to get cell_type"); let hotspot_1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; let hotspot_2: PublicKeyBinary = "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp".parse()?; sqlx::query( r#" -INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) +INSERT INTO cbrs_heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) VALUES ($1, $2, 'sercommindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00'), ($1, $3, 'sercommindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00'), @@ -51,15 +53,22 @@ VALUES let start_period: DateTime = "2023-08-25 00:00:00.000000000 UTC".parse()?; let end_period: DateTime = "2023-08-26 00:00:00.000000000 UTC".parse()?; - let heartbeat_reward: Vec<_> = HeartbeatReward::validated(&pool, &(start_period..end_period)) - .try_collect() - .await?; + let max_asserted_distance_deviation: u32 = 300; + + let heartbeat_reward: Vec<_> = HeartbeatReward::validated( + &pool, + &(start_period..end_period), + max_asserted_distance_deviation, + ) + .try_collect() + .await?; assert_eq!( heartbeat_reward, vec![HeartbeatReward { hotspot_key: hotspot_2, - cbsd_id, + cell_type, + cbsd_id: Some(cbsd_id), reward_weight: Decimal::ONE }] ); @@ -71,13 +80,14 @@ VALUES #[ignore] async fn ensure_hotspot_does_not_affect_count(pool: PgPool) -> anyhow::Result<()> { let cbsd_id = "P27-SCE4255W120200039521XGB0103".to_string(); + let cell_type = CellType::from_cbsd_id(&cbsd_id).expect("unable to get cell_type"); let hotspot_1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; let hotspot_2: PublicKeyBinary = "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp".parse()?; sqlx::query( r#" -INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) +INSERT INTO cbrs_heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) VALUES ($1, $2, 'sercommindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00'), ($1, $2, 'sercommindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00'), @@ -101,15 +111,21 @@ VALUES let start_period: DateTime = "2023-08-25 00:00:00.000000000 UTC".parse()?; let end_period: DateTime = "2023-08-26 00:00:00.000000000 UTC".parse()?; - let heartbeat_reward: Vec<_> = HeartbeatReward::validated(&pool, &(start_period..end_period)) - .try_collect() - .await?; + let max_asserted_distance_deviation: u32 = 300; + let heartbeat_reward: Vec<_> = HeartbeatReward::validated( + &pool, + &(start_period..end_period), + max_asserted_distance_deviation, + ) + .try_collect() + .await?; assert_eq!( heartbeat_reward, vec![HeartbeatReward { hotspot_key: hotspot_2, - cbsd_id, + cell_type, + cbsd_id: Some(cbsd_id), reward_weight: Decimal::ONE }] ); @@ -125,7 +141,7 @@ async fn ensure_minimum_count(pool: PgPool) -> anyhow::Result<()> { "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; sqlx::query( r#" -INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) +INSERT INTO cbrs_heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) VALUES ($1, $2, 'sercommindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00'), ($1, $2, 'sercommindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00'), @@ -147,9 +163,15 @@ VALUES let start_period: DateTime = "2023-08-25 00:00:00.000000000 UTC".parse()?; let end_period: DateTime = "2023-08-26 00:00:00.000000000 UTC".parse()?; - let heartbeat_reward: Vec<_> = HeartbeatReward::validated(&pool, &(start_period..end_period)) - .try_collect() - .await?; + let max_asserted_distance_deviation: u32 = 300; + + let heartbeat_reward: Vec<_> = HeartbeatReward::validated( + &pool, + &(start_period..end_period), + max_asserted_distance_deviation, + ) + .try_collect() + .await?; assert!(heartbeat_reward.is_empty());