diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 21500e3a7..a99314f76 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -92,7 +92,7 @@ jobs: strategy: fail-fast: false matrix: - package: [boost-manager,file-store,iot-config,iot-packet-verifier,iot-verifier,mobile-config,mobile-verifier] + package: [boost-manager,file-store,iot-config,iot-packet-verifier,iot-verifier,mobile-config,mobile-packet-verifier,mobile-verifier] concurrency: group: ${{ github.workflow }}-${{ github.ref }}-tests-postgres-${{ matrix.package }} cancel-in-progress: true @@ -142,7 +142,7 @@ jobs: strategy: fail-fast: false matrix: - package: [coverage-map,coverage-point-calculator,ingest,mobile-packet-verifier,reward-scheduler,task-manager] + package: [coverage-map,coverage-point-calculator,ingest,reward-scheduler,task-manager] concurrency: group: ${{ github.workflow }}-${{ github.ref }}-tests-${{ matrix.package }} cancel-in-progress: true diff --git a/Cargo.lock b/Cargo.lock index 8c921c004..2fa28164d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,7 +1617,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#9d1785d58c5db85a28ca511c02f01a26b58811a7" +source = "git+https://github.com/helium/proto?branch=master#197ff9c6cde7dc0d8334d6b4e27c58779e6a7ce0" dependencies = [ "base64 0.21.7", "byteorder", @@ -2117,7 +2117,16 @@ dependencies = [ [[package]] name = "circuit-breaker" version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "circuit-breaker" +version = "0.1.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -2759,7 +2768,16 @@ dependencies = [ [[package]] name = "data-credits" version = "0.2.2" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "data-credits" +version = "0.2.2" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3137,7 +3155,16 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "fanout" version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "fanout" +version = "0.1.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3713,23 +3740,45 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "helium-anchor-gen" version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", - "circuit-breaker", - "data-credits", - "fanout", - "helium-entity-manager", - "helium-sub-daos", - "hexboosting", - "lazy-distributor", - "lazy-transactions", - "mobile-entity-manager", - "price-oracle", - "rewards-oracle", - "treasury-management", - "voter-stake-registry", + "circuit-breaker 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "data-credits 0.2.2 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "fanout 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "helium-entity-manager 0.2.11 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "helium-sub-daos 0.1.8 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "hexboosting 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "lazy-distributor 0.2.0 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "lazy-transactions 0.2.0 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "mobile-entity-manager 0.1.3 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "price-oracle 0.2.1 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "rewards-oracle 0.2.0 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "treasury-management 0.2.0 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", + "voter-stake-registry 0.3.3 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", +] + +[[package]] +name = "helium-anchor-gen" +version = "0.1.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", + "circuit-breaker 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git)", + "data-credits 0.2.2 (git+https://github.com/helium/helium-anchor-gen.git)", + "fanout 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git)", + "helium-entity-manager 0.2.11 (git+https://github.com/helium/helium-anchor-gen.git)", + "helium-sub-daos 0.1.8 (git+https://github.com/helium/helium-anchor-gen.git)", + "hexboosting 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git)", + "lazy-distributor 0.2.0 (git+https://github.com/helium/helium-anchor-gen.git)", + "lazy-transactions 0.2.0 (git+https://github.com/helium/helium-anchor-gen.git)", + "mobile-entity-manager 0.1.3 (git+https://github.com/helium/helium-anchor-gen.git)", + "price-oracle 0.2.1 (git+https://github.com/helium/helium-anchor-gen.git)", + "rewards-oracle 0.2.0 (git+https://github.com/helium/helium-anchor-gen.git)", + "treasury-management 0.2.0 (git+https://github.com/helium/helium-anchor-gen.git)", + "voter-stake-registry 0.3.3 (git+https://github.com/helium/helium-anchor-gen.git)", ] [[package]] @@ -3742,7 +3791,7 @@ dependencies = [ "bs58 0.5.0", "byteorder", "ed25519-compact", - "getrandom 0.1.16", + "getrandom 0.2.10", "k256", "lazy_static", "multihash", @@ -3759,8 +3808,17 @@ dependencies = [ [[package]] name = "helium-entity-manager" -version = "0.3.1" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +version = "0.2.11" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "helium-entity-manager" +version = "0.2.11" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3769,7 +3827,7 @@ dependencies = [ [[package]] name = "helium-lib" version = "0.0.0" -source = "git+https://github.com/helium/helium-wallet-rs.git?branch=master#b54819ac4c4bd73be37d25d7d6d48842bbc95ea9" +source = "git+https://github.com/helium/helium-wallet-rs.git?branch=master#4acf688beac3c507c33843a745516839e1f814b2" dependencies = [ "anchor-client", "anchor-spl", @@ -3780,7 +3838,7 @@ dependencies = [ "chrono", "futures", "h3o", - "helium-anchor-gen", + "helium-anchor-gen 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", "helium-crypto", "helium-proto", "hex", @@ -3809,7 +3867,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#9d1785d58c5db85a28ca511c02f01a26b58811a7" +source = "git+https://github.com/helium/proto?branch=master#197ff9c6cde7dc0d8334d6b4e27c58779e6a7ce0" dependencies = [ "bytes", "prost", @@ -3825,7 +3883,16 @@ dependencies = [ [[package]] name = "helium-sub-daos" version = "0.1.8" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "helium-sub-daos" +version = "0.1.8" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3879,7 +3946,16 @@ checksum = "7ebdb29d2ea9ed0083cd8cece49bbd968021bd99b0849edb4a9a7ee0fdf6a4e0" [[package]] name = "hexboosting" version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "hexboosting" +version = "0.1.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -4690,7 +4766,25 @@ dependencies = [ [[package]] name = "lazy-distributor" version = "0.2.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "lazy-distributor" +version = "0.2.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "lazy-transactions" +version = "0.2.0" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -4699,7 +4793,7 @@ dependencies = [ [[package]] name = "lazy-transactions" version = "0.2.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -5075,7 +5169,16 @@ dependencies = [ [[package]] name = "mobile-entity-manager" version = "0.1.3" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "mobile-entity-manager" +version = "0.1.3" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -5507,7 +5610,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ - "proc-macro-crate 1.1.3", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.58", @@ -5904,7 +6007,7 @@ dependencies = [ "file-store", "futures", "futures-util", - "helium-anchor-gen", + "helium-anchor-gen 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git)", "helium-lib", "helium-proto", "humantime-serde", @@ -5930,7 +6033,16 @@ dependencies = [ [[package]] name = "price-oracle" version = "0.2.1" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "price-oracle" +version = "0.2.1" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -6053,7 +6165,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.4.0", + "heck 0.5.0", "itertools", "log", "multimap", @@ -6573,7 +6685,16 @@ dependencies = [ [[package]] name = "rewards-oracle" version = "0.2.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "rewards-oracle" +version = "0.2.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -7286,7 +7407,7 @@ dependencies = [ "clap 4.4.8", "file-store", "futures", - "helium-anchor-gen", + "helium-anchor-gen 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git)", "helium-crypto", "itertools", "metrics", @@ -9208,7 +9329,16 @@ dependencies = [ [[package]] name = "treasury-management" version = "0.2.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "treasury-management" +version = "0.2.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -9254,7 +9384,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.7.3", + "rand 0.8.5", "static_assertions", ] @@ -9450,7 +9580,16 @@ dependencies = [ [[package]] name = "voter-stake-registry" version = "0.3.3" -source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" +source = "git+https://github.com/helium/helium-anchor-gen.git?branch=main#761b839a71cc6d2aecf4be994af9d8206aeae0e1" +dependencies = [ + "anchor-gen", + "anchor-lang 0.30.1", +] + +[[package]] +name = "voter-stake-registry" +version = "0.3.3" +source = "git+https://github.com/helium/helium-anchor-gen.git#761b839a71cc6d2aecf4be994af9d8206aeae0e1" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 3ce054c88..a9ada7896 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -19,6 +19,12 @@ pub struct GatewayInfo { pub device_type: DeviceType, } +impl GatewayInfo { + pub fn is_data_only(&self) -> bool { + matches!(self.device_type, DeviceType::WifiDataOnly) + } +} + impl TryFrom for GatewayInfo { type Error = std::num::ParseIntError; @@ -64,6 +70,7 @@ pub enum DeviceType { Cbrs, WifiIndoor, WifiOutdoor, + WifiDataOnly, } impl From for DeviceType { @@ -72,6 +79,7 @@ impl From for DeviceType { DeviceTypeProto::Cbrs => DeviceType::Cbrs, DeviceTypeProto::WifiIndoor => DeviceType::WifiIndoor, DeviceTypeProto::WifiOutdoor => DeviceType::WifiOutdoor, + DeviceTypeProto::WifiDataOnly => DeviceType::WifiDataOnly, } } } @@ -88,6 +96,7 @@ impl std::str::FromStr for DeviceType { "cbrs" => Self::Cbrs, "wifiIndoor" => Self::WifiIndoor, "wifiOutdoor" => Self::WifiOutdoor, + "wifiDataOnly" => Self::WifiDataOnly, _ => return Err(DeviceTypeParseError), }; Ok(result) diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 9e66f1547..68697b945 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -4,22 +4,16 @@ use file_store::mobile_session::{ DataTransferSessionIngestReport, InvalidDataTransferIngestReport, }; use futures::{Stream, StreamExt}; -use helium_crypto::PublicKeyBinary; -use helium_proto::services::mobile_config::NetworkKeyRole; use helium_proto::services::poc_mobile::{ invalid_data_transfer_ingest_report_v1::DataTransferIngestReportStatus, InvalidDataTransferIngestReportV1, }; -use mobile_config::client::{ - authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver, -}; use sqlx::{Postgres, Transaction}; -use crate::event_ids; +use crate::{event_ids, MobileConfigResolverExt}; pub async fn accumulate_sessions( - gateway_info_resolver: &impl GatewayInfoResolver, - authorization_verifier: &impl AuthorizationVerifier, + mobile_config: &impl MobileConfigResolverExt, conn: &mut Transaction<'_, Postgres>, invalid_data_session_report_sink: &FileSinkClient, curr_file_ts: DateTime, @@ -28,21 +22,10 @@ pub async fn accumulate_sessions( tokio::pin!(reports); while let Some(report) = reports.next().await { - // If the reward has been cancelled or it fails verification checks then skip - // the report and write it out to s3 as invalid - if report.report.rewardable_bytes == 0 { - write_invalid_report( - invalid_data_session_report_sink, - DataTransferIngestReportStatus::Cancelled, - report, - ) - .await?; - continue; - } - - let report_validity = - verify_report(conn, gateway_info_resolver, authorization_verifier, &report).await?; + let report_validity = verify_report(conn, mobile_config, &report).await?; if report_validity != DataTransferIngestReportStatus::Valid { + // If the reward has been cancelled or it fails verification checks then skip + // the report and write it out to s3 as invalid write_invalid_report(invalid_data_session_report_sink, report_validity, report).await?; continue; } @@ -73,46 +56,29 @@ pub async fn accumulate_sessions( async fn verify_report( txn: &mut Transaction<'_, Postgres>, - gateway_info_resolver: &impl GatewayInfoResolver, - authorization_verifier: &impl AuthorizationVerifier, + mobile_config: &impl MobileConfigResolverExt, report: &DataTransferSessionIngestReport, ) -> anyhow::Result { + if report.report.rewardable_bytes == 0 { + return Ok(DataTransferIngestReportStatus::Cancelled); + } + if is_duplicate(txn, report).await? { return Ok(DataTransferIngestReportStatus::Duplicate); } - if !verify_gateway( - gateway_info_resolver, - &report.report.data_transfer_usage.pub_key, - ) - .await - { + let gw_pub_key = &report.report.data_transfer_usage.pub_key; + let routing_pub_key = &report.report.pub_key; + + if !mobile_config.is_gateway_known(gw_pub_key).await { return Ok(DataTransferIngestReportStatus::InvalidGatewayKey); - }; - if !verify_known_routing_key(authorization_verifier, &report.report.pub_key).await { - return Ok(DataTransferIngestReportStatus::InvalidRoutingKey); - }; - Ok(DataTransferIngestReportStatus::Valid) -} + } -async fn verify_gateway( - gateway_info_resolver: &impl GatewayInfoResolver, - public_key: &PublicKeyBinary, -) -> bool { - match gateway_info_resolver.resolve_gateway_info(public_key).await { - Ok(res) => res.is_some(), - Err(_err) => false, + if !mobile_config.is_routing_key_known(routing_pub_key).await { + return Ok(DataTransferIngestReportStatus::InvalidRoutingKey); } -} -async fn verify_known_routing_key( - authorization_verifier: &impl AuthorizationVerifier, - public_key: &PublicKeyBinary, -) -> bool { - authorization_verifier - .verify_authorized_key(public_key, NetworkKeyRole::MobileRouter) - .await - .unwrap_or_default() + Ok(DataTransferIngestReportStatus::Valid) } async fn is_duplicate( @@ -144,3 +110,170 @@ async fn write_invalid_report( .await?; Ok(()) } + +#[cfg(test)] +mod tests { + use file_store::{ + file_sink::FileSinkClient, + mobile_session::{DataTransferEvent, DataTransferSessionReq}, + }; + use helium_crypto::PublicKeyBinary; + use helium_proto::services::poc_mobile::DataTransferRadioAccessTechnology; + use sqlx::PgPool; + + use crate::burner::DataTransferSession; + + use super::*; + + struct MockResolver; + + #[async_trait::async_trait] + impl MobileConfigResolverExt for MockResolver { + async fn is_gateway_known(&self, _public_key: &PublicKeyBinary) -> bool { + true + } + + async fn is_routing_key_known(&self, _public_key: &PublicKeyBinary) -> bool { + true + } + } + + #[sqlx::test] + async fn accumulate_no_reports(pool: PgPool) -> anyhow::Result<()> { + let mut txn = pool.begin().await?; + + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + let invalid_data_session_report_sink = FileSinkClient::new(tx, "test"); + + accumulate_sessions( + &MockResolver, + &mut txn, + &invalid_data_session_report_sink, + Utc::now(), + futures::stream::iter(vec![]), + ) + .await?; + + txn.commit().await?; + + // channel is empty + rx.assert_is_empty()?; + + let sessions: Vec = + sqlx::query_as("SELECT * from data_transfer_sessions") + .fetch_all(&pool) + .await?; + assert!(sessions.is_empty()); + + Ok(()) + } + + #[sqlx::test] + async fn accumulate_writes_zero_data_event_as_invalid(pool: PgPool) -> anyhow::Result<()> { + let mut txn = pool.begin().await?; + + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + let invalid_data_session_report_sink = FileSinkClient::new(tx, "test"); + + let report = DataTransferSessionIngestReport { + report: DataTransferSessionReq { + data_transfer_usage: DataTransferEvent { + pub_key: vec![0].into(), + upload_bytes: 0, + download_bytes: 0, + radio_access_technology: DataTransferRadioAccessTechnology::Wlan, + event_id: "test".to_string(), + payer: vec![0].into(), + timestamp: Utc::now(), + signature: vec![], + }, + rewardable_bytes: 0, + pub_key: vec![0].into(), + signature: vec![], + }, + received_timestamp: Utc::now(), + }; + + accumulate_sessions( + &MockResolver, + &mut txn, + &invalid_data_session_report_sink, + Utc::now(), + futures::stream::iter(vec![report]), + ) + .await?; + + txn.commit().await?; + + // single record written to invalid sink + match rx.try_recv() { + Ok(_) => (), + other => panic!("unexpected: {other:?}"), + } + + Ok(()) + } + + #[sqlx::test] + async fn write_valid_event_to_db(pool: PgPool) -> anyhow::Result<()> { + let mut txn = pool.begin().await?; + + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + let invalid_data_session_report_sink = FileSinkClient::new(tx, "test"); + + let report = DataTransferSessionIngestReport { + report: DataTransferSessionReq { + data_transfer_usage: DataTransferEvent { + pub_key: vec![0].into(), + upload_bytes: 1, + download_bytes: 2, + radio_access_technology: DataTransferRadioAccessTechnology::Wlan, + event_id: "test".to_string(), + payer: vec![0].into(), + timestamp: Utc::now(), + signature: vec![], + }, + rewardable_bytes: 3, + pub_key: vec![0].into(), + signature: vec![], + }, + received_timestamp: Utc::now(), + }; + + accumulate_sessions( + &MockResolver, + &mut txn, + &invalid_data_session_report_sink, + Utc::now(), + futures::stream::iter(vec![report]), + ) + .await?; + + txn.commit().await?; + + // no records written to invalid sink + rx.assert_is_empty()?; + + let sessions: Vec = + sqlx::query_as("SELECT * from data_transfer_sessions") + .fetch_all(&pool) + .await?; + assert_eq!(sessions.len(), 1); + + Ok(()) + } + + trait ChannelExt { + fn assert_is_empty(&mut self) -> anyhow::Result<()>; + } + + impl ChannelExt for tokio::sync::mpsc::Receiver { + fn assert_is_empty(&mut self) -> anyhow::Result<()> { + match self.try_recv() { + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (), + other => panic!("unexpected message: {other:?}"), + } + Ok(()) + } + } +} diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index c526025c8..271dc8f7b 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -1,4 +1,7 @@ -use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings}; +use crate::{ + burner::Burner, event_ids::EventIdPurger, settings::Settings, MobileConfigClients, + MobileConfigResolverExt, +}; use anyhow::{bail, Result}; use chrono::{TimeZone, Utc}; use file_store::{ @@ -13,10 +16,6 @@ use file_store::{ use helium_proto::services::{ packet_verifier::ValidDataTransferSession, poc_mobile::InvalidDataTransferIngestReportV1, }; -use mobile_config::client::{ - authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver, - AuthorizationClient, GatewayClient, -}; use solana::burn::{SolanaNetwork, SolanaRpc}; use sqlx::{Pool, Postgres}; use task_manager::{ManagedTask, TaskManager}; @@ -25,25 +24,23 @@ use tokio::{ time::{sleep_until, Duration, Instant}, }; -pub struct Daemon { +pub struct Daemon { pool: Pool, burner: Burner, reports: Receiver>, burn_period: Duration, min_burn_period: Duration, - gateway_info_resolver: GIR, - authorization_verifier: AV, + mobile_config_resolver: MCR, invalid_data_session_report_sink: FileSinkClient, } -impl Daemon { +impl Daemon { pub fn new( settings: &Settings, pool: Pool, reports: Receiver>, burner: Burner, - gateway_info_resolver: GIR, - authorization_verifier: AV, + mobile_config_resolver: MCR, invalid_data_session_report_sink: FileSinkClient, ) -> Self { Self { @@ -52,18 +49,16 @@ impl Daemon { reports, burn_period: settings.burn_period, min_burn_period: settings.min_burn_period, - gateway_info_resolver, - authorization_verifier, + mobile_config_resolver, invalid_data_session_report_sink, } } } -impl ManagedTask for Daemon +impl ManagedTask for Daemon where S: SolanaNetwork, - GIR: GatewayInfoResolver, - AV: AuthorizationVerifier + 'static, + MCR: MobileConfigResolverExt + 'static, { fn start_task( self: Box, @@ -73,11 +68,10 @@ where } } -impl Daemon +impl Daemon where S: SolanaNetwork, - GIR: GatewayInfoResolver, - AV: AuthorizationVerifier, + MCR: MobileConfigResolverExt, { pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> { // Set the initial burn period to one minute @@ -92,7 +86,7 @@ where let ts = file.file_info.timestamp; let mut transaction = self.pool.begin().await?; let reports = file.into_stream(&mut transaction).await?; - crate::accumulate::accumulate_sessions(&self.gateway_info_resolver, &self.authorization_verifier, &mut transaction, &self.invalid_data_session_report_sink, ts, reports).await?; + crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.invalid_data_session_report_sink, ts, reports).await?; transaction.commit().await?; self.invalid_data_session_report_sink.commit().await?; }, @@ -176,16 +170,14 @@ impl Cmd { .create() .await?; - let gateway_client = GatewayClient::from_settings(&settings.config_client)?; - let auth_client = AuthorizationClient::from_settings(&settings.config_client)?; + let resolver = MobileConfigClients::new(&settings.config_client)?; let daemon = Daemon::new( settings, pool.clone(), reports, burner, - gateway_client, - auth_client, + resolver, invalid_sessions, ); diff --git a/mobile_packet_verifier/src/lib.rs b/mobile_packet_verifier/src/lib.rs index 4d6c71332..9d80a855e 100644 --- a/mobile_packet_verifier/src/lib.rs +++ b/mobile_packet_verifier/src/lib.rs @@ -1,5 +1,48 @@ +use helium_crypto::PublicKeyBinary; +use helium_proto::services::mobile_config::NetworkKeyRole; +use mobile_config::client::{ + self, authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver, +}; + pub mod accumulate; pub mod burner; pub mod daemon; pub mod event_ids; pub mod settings; + +pub struct MobileConfigClients { + gateway_client: client::GatewayClient, + auth_client: client::AuthorizationClient, +} + +impl MobileConfigClients { + pub fn new(settings: &client::Settings) -> anyhow::Result { + Ok(Self { + gateway_client: client::GatewayClient::from_settings(settings)?, + auth_client: client::AuthorizationClient::from_settings(settings)?, + }) + } +} + +#[async_trait::async_trait] +pub trait MobileConfigResolverExt { + async fn is_gateway_known(&self, public_key: &PublicKeyBinary) -> bool; + async fn is_routing_key_known(&self, public_key: &PublicKeyBinary) -> bool; +} + +#[async_trait::async_trait] +impl MobileConfigResolverExt for MobileConfigClients { + async fn is_gateway_known(&self, public_key: &PublicKeyBinary) -> bool { + match self.gateway_client.resolve_gateway_info(public_key).await { + Ok(res) => res.is_some(), + Err(_err) => false, + } + } + + async fn is_routing_key_known(&self, public_key: &PublicKeyBinary) -> bool { + self.auth_client + .verify_authorized_key(public_key, NetworkKeyRole::MobileRouter) + .await + .unwrap_or_default() + } +} diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index c3399f58d..b72b47526 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -488,6 +488,14 @@ impl ValidatedHeartbeat { .resolve_gateway(&heartbeat.hotspot_key) .await? { + GatewayResolution::DataOnly => Ok(Self::new( + heartbeat, + cell_type, + dec!(0), + None, + Some(coverage_object.meta), + proto::HeartbeatValidity::InvalidDeviceType, + )), GatewayResolution::GatewayNotFound => Ok(Self::new( heartbeat, cell_type, diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index 4855f8a63..61cb44a9f 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -26,6 +26,7 @@ pub enum GatewayResolution { GatewayNotFound, GatewayNotAsserted, AssertedLocation(u64), + DataOnly, } #[async_trait::async_trait] @@ -47,9 +48,14 @@ impl GatewayResolver for mobile_config::GatewayClient { address: &helium_crypto::PublicKeyBinary, ) -> Result { use mobile_config::client::gateway_client::GatewayInfoResolver; - use mobile_config::gateway_info::GatewayInfo; + use mobile_config::gateway_info::{DeviceType, GatewayInfo}; + match self.resolve_gateway_info(address).await? { None => Ok(GatewayResolution::GatewayNotFound), + Some(GatewayInfo { + device_type: DeviceType::WifiDataOnly, + .. + }) => Ok(GatewayResolution::DataOnly), Some(GatewayInfo { metadata: Some(metadata), .. diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 07a116dd3..5ed549c47 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -18,8 +18,8 @@ use futures::{ }; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ - SpeedtestAvg as SpeedtestAvgProto, SpeedtestIngestReportV1, SpeedtestVerificationResult, - VerifiedSpeedtest as VerifiedSpeedtestProto, + SpeedtestAvg as SpeedtestAvgProto, SpeedtestIngestReportV1, + SpeedtestVerificationResult as SpeedtestResult, VerifiedSpeedtest as VerifiedSpeedtestProto, }; use mobile_config::client::gateway_client::GatewayInfoResolver; use sqlx::{postgres::PgRow, FromRow, Pool, Postgres, Row, Transaction}; @@ -152,7 +152,7 @@ where let mut speedtests = file.into_stream(&mut transaction).await?; while let Some(speedtest_report) = speedtests.next().await { let result = self.validate_speedtest(&speedtest_report).await?; - if result == SpeedtestVerificationResult::SpeedtestValid { + if result == SpeedtestResult::SpeedtestValid { save_speedtest(&speedtest_report.report, &mut transaction).await?; let latest_speedtests = get_latest_speedtests_for_pubkey( &speedtest_report.report.pubkey, @@ -176,24 +176,26 @@ where pub async fn validate_speedtest( &self, speedtest: &CellSpeedtestIngestReport, - ) -> anyhow::Result { + ) -> anyhow::Result { let pubkey = speedtest.report.pubkey.clone(); - if self + + match self .gateway_info_resolver .resolve_gateway_info(&pubkey) .await? - .is_some() { - Ok(SpeedtestVerificationResult::SpeedtestValid) - } else { - Ok(SpeedtestVerificationResult::SpeedtestGatewayNotFound) + Some(gw_info) if gw_info.is_data_only() => { + Ok(SpeedtestResult::SpeedtestInvalidDeviceType) + } + Some(_) => Ok(SpeedtestResult::SpeedtestValid), + None => Ok(SpeedtestResult::SpeedtestGatewayNotFound), } } pub async fn write_verified_speedtest( &self, speedtest_report: CellSpeedtestIngestReport, - result: SpeedtestVerificationResult, + result: SpeedtestResult, ) -> anyhow::Result<()> { let ingest_report: SpeedtestIngestReportV1 = speedtest_report.into(); let timestamp: u64 = Utc::now().timestamp_millis() as u64; diff --git a/mobile_verifier/tests/integrations/boosting_oracles.rs b/mobile_verifier/tests/integrations/boosting_oracles.rs index f9f03e351..55f1e58eb 100644 --- a/mobile_verifier/tests/integrations/boosting_oracles.rs +++ b/mobile_verifier/tests/integrations/boosting_oracles.rs @@ -24,7 +24,6 @@ use mobile_verifier::{ sp_boosted_rewards_bans::BannedRadios, speedtests::Speedtest, speedtests_average::{SpeedtestAverage, SpeedtestAverages}, - GatewayResolution, GatewayResolver, }; use rust_decimal::Decimal; use rust_decimal_macros::dec; @@ -32,7 +31,7 @@ use sqlx::PgPool; use std::{collections::HashMap, pin::pin}; use uuid::Uuid; -use crate::common; +use crate::common::{self, GatewayClientAllOwnersValid}; #[derive(Clone)] struct MockGeofence; @@ -43,21 +42,6 @@ impl GeofenceValidator for MockGeofence { } } -#[derive(Copy, Clone)] -struct AllOwnersValid; - -#[async_trait::async_trait] -impl GatewayResolver for AllOwnersValid { - type Error = std::convert::Infallible; - - async fn resolve_gateway( - &self, - _address: &PublicKeyBinary, - ) -> Result { - Ok(GatewayResolution::AssertedLocation(0x8c2681a3064d9ff)) - } -} - fn heartbeats<'a>( num: usize, start: DateTime, @@ -359,7 +343,7 @@ async fn test_footfall_and_urbanization_and_landtype(pool: PgPool) -> anyhow::Re let epoch = start..end; let mut heartbeats = pin!(ValidatedHeartbeat::validate_heartbeats( stream::iter(heartbeats.map(Heartbeat::from)), - &AllOwnersValid, + &GatewayClientAllOwnersValid, &coverage_objects, &location_cache, 2000, diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 178bda55d..47fe1cc30 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -22,7 +22,9 @@ use mobile_config::{ }, }; -use mobile_verifier::boosting_oracles::AssignedCoverageObjects; +use mobile_verifier::{ + boosting_oracles::AssignedCoverageObjects, GatewayResolution, GatewayResolver, +}; use rust_decimal::{prelude::ToPrimitive, Decimal}; use rust_decimal_macros::dec; use sqlx::PgPool; @@ -347,3 +349,18 @@ impl EntityVerifier for MockEntityClient { Ok(true) } } + +#[derive(Debug, Copy, Clone)] +pub struct GatewayClientAllOwnersValid; + +#[async_trait] +impl GatewayResolver for GatewayClientAllOwnersValid { + type Error = std::convert::Infallible; + + async fn resolve_gateway( + &self, + _address: &PublicKeyBinary, + ) -> Result { + Ok(GatewayResolution::AssertedLocation(0x8c2681a3064d9ff)) + } +} diff --git a/mobile_verifier/tests/integrations/last_location.rs b/mobile_verifier/tests/integrations/last_location.rs index 34b9f7b27..c04d0678d 100644 --- a/mobile_verifier/tests/integrations/last_location.rs +++ b/mobile_verifier/tests/integrations/last_location.rs @@ -9,12 +9,13 @@ use mobile_verifier::{ coverage::{CoverageObject, CoverageObjectCache}, geofence::GeofenceValidator, heartbeats::{last_location::LocationCache, HbType, Heartbeat, ValidatedHeartbeat}, - GatewayResolution, GatewayResolver, }; use rust_decimal_macros::dec; use sqlx::{PgPool, Postgres, Transaction}; use uuid::Uuid; +use crate::common::GatewayClientAllOwnersValid; + const PUB_KEY: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; #[derive(Clone)] @@ -26,21 +27,6 @@ impl GeofenceValidator for MockGeofence { } } -#[derive(Copy, Clone)] -struct AllOwnersValid; - -#[async_trait::async_trait] -impl GatewayResolver for AllOwnersValid { - type Error = std::convert::Infallible; - - async fn resolve_gateway( - &self, - _address: &PublicKeyBinary, - ) -> Result { - Ok(GatewayResolution::AssertedLocation(0x8c2681a3064d9ff)) - } -} - #[sqlx::test] async fn heartbeat_uses_last_good_location_when_invalid_location( pool: PgPool, @@ -60,7 +46,7 @@ async fn heartbeat_uses_last_good_location_when_invalid_location( heartbeat(&hotspot, &coverage_object) .location_validation_timestamp(Utc::now()) .build(), - &AllOwnersValid, + &GatewayClientAllOwnersValid, &coverage_objects, &location_cache, u32::MAX, @@ -78,7 +64,7 @@ async fn heartbeat_uses_last_good_location_when_invalid_location( heartbeat(&hotspot, &coverage_object) .latlng((0.0, 0.0)) .build(), - &AllOwnersValid, + &GatewayClientAllOwnersValid, &coverage_objects, &location_cache, u32::MAX, @@ -122,7 +108,7 @@ async fn heartbeat_will_use_last_good_location_from_db(pool: PgPool) -> anyhow:: heartbeat(&hotspot, &coverage_object) .location_validation_timestamp(Utc::now()) .build(), - &AllOwnersValid, + &GatewayClientAllOwnersValid, &coverage_objects, &location_cache, u32::MAX, @@ -145,7 +131,7 @@ async fn heartbeat_will_use_last_good_location_from_db(pool: PgPool) -> anyhow:: heartbeat(&hotspot, &coverage_object) .latlng((0.0, 0.0)) .build(), - &AllOwnersValid, + &GatewayClientAllOwnersValid, &coverage_objects, &location_cache, u32::MAX, @@ -192,7 +178,7 @@ async fn heartbeat_does_not_use_last_good_location_when_more_than_12_hours( .location_validation_timestamp(Utc::now()) .timestamp(Utc::now() - Duration::hours(12) - Duration::seconds(1)) .build(), - &AllOwnersValid, + &GatewayClientAllOwnersValid, &coverage_objects, &location_cache, u32::MAX, @@ -210,7 +196,7 @@ async fn heartbeat_does_not_use_last_good_location_when_more_than_12_hours( heartbeat(&hotspot, &coverage_object) .latlng((0.0, 0.0)) .build(), - &AllOwnersValid, + &GatewayClientAllOwnersValid, &coverage_objects, &location_cache, u32::MAX, diff --git a/mobile_verifier/tests/integrations/modeled_coverage.rs b/mobile_verifier/tests/integrations/modeled_coverage.rs index bb9c406f9..a4e7d8224 100644 --- a/mobile_verifier/tests/integrations/modeled_coverage.rs +++ b/mobile_verifier/tests/integrations/modeled_coverage.rs @@ -27,7 +27,7 @@ use mobile_verifier::{ sp_boosted_rewards_bans::BannedRadios, speedtests::Speedtest, speedtests_average::{SpeedtestAverage, SpeedtestAverages}, - GatewayResolution, GatewayResolver, IsAuthorized, + IsAuthorized, }; use rust_decimal_macros::dec; use solana_sdk::pubkey::Pubkey; @@ -35,7 +35,7 @@ use sqlx::PgPool; use std::{collections::HashMap, num::NonZeroU32, ops::Range, pin::pin, str::FromStr}; use uuid::Uuid; -use crate::common; +use crate::common::{self, GatewayClientAllOwnersValid}; #[derive(Clone)] struct MockGeofence; @@ -248,21 +248,6 @@ async fn test_coverage_object_save_updates(pool: PgPool) -> anyhow::Result<()> { Ok(()) } -#[derive(Copy, Clone)] -struct AllOwnersValid; - -#[async_trait::async_trait] -impl GatewayResolver for AllOwnersValid { - type Error = std::convert::Infallible; - - async fn resolve_gateway( - &self, - _address: &PublicKeyBinary, - ) -> Result { - Ok(GatewayResolution::AssertedLocation(0x8c2681a3064d9ff)) - } -} - #[derive(Copy, Clone)] struct AllPubKeysAuthed; @@ -414,7 +399,7 @@ async fn process_input( let mut transaction = pool.begin().await?; let mut heartbeats = pin!(ValidatedHeartbeat::validate_heartbeats( stream::iter(heartbeats.map(Heartbeat::from)), - &AllOwnersValid, + &GatewayClientAllOwnersValid, &coverage_objects, &location_cache, 2000, @@ -1410,7 +1395,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow let validate = |latlng: LatLng| { ValidatedHeartbeat::validate( mk_heartbeat(latlng).into(), - &AllOwnersValid, + &GatewayClientAllOwnersValid, &coverage_object_cache, &location_cache, max_covered_distance,