diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index a99314f76..21500e3a7 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -92,7 +92,7 @@ jobs: strategy: fail-fast: false matrix: - package: [boost-manager,file-store,iot-config,iot-packet-verifier,iot-verifier,mobile-config,mobile-packet-verifier,mobile-verifier] + package: [boost-manager,file-store,iot-config,iot-packet-verifier,iot-verifier,mobile-config,mobile-verifier] concurrency: group: ${{ github.workflow }}-${{ github.ref }}-tests-postgres-${{ matrix.package }} cancel-in-progress: true @@ -142,7 +142,7 @@ jobs: strategy: fail-fast: false matrix: - package: [coverage-map,coverage-point-calculator,ingest,reward-scheduler,task-manager] + package: [coverage-map,coverage-point-calculator,ingest,mobile-packet-verifier,reward-scheduler,task-manager] concurrency: group: ${{ github.workflow }}-${{ github.ref }}-tests-${{ matrix.package }} cancel-in-progress: true diff --git a/Cargo.lock b/Cargo.lock index 2058e861d..cfed0a862 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,7 +1617,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#80e529d0e64951554cb4ca8e71c3cd9d30a91bc4" +source = "git+https://github.com/helium/proto?branch=master#b45858ce482f1e831d3dad07261770c7c53a1024" dependencies = [ "base64 0.21.7", "byteorder", @@ -1627,7 +1627,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", ] @@ -3801,7 +3801,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#80e529d0e64951554cb4ca8e71c3cd9d30a91bc4" +source = "git+https://github.com/helium/proto?branch=master#b45858ce482f1e831d3dad07261770c7c53a1024" dependencies = [ "bytes", "prost", @@ -5094,7 +5094,6 @@ dependencies = [ "mobile-config", "poc-metrics", "prost", - "rand 0.8.5", "serde", "sha2 0.10.8", "solana", @@ -9852,7 +9851,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", "twox-hash", "xorf", diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index f0190c4bd..100154d6c 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -146,7 +146,6 @@ pub const DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "data_transfer_session_ing pub const INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "invalid_data_transfer_session_ingest_report"; pub const VALID_DATA_TRANSFER_SESSION: &str = "valid_data_transfer_session"; -pub const PENDING_DATA_TRANSFER_SESION: &str = "pending_data_transfer_sesion"; pub const PRICE_REPORT: &str = "price_report"; pub const MOBILE_REWARD_SHARE: &str = "mobile_reward_share"; pub const MAPPER_MSG: &str = "mapper_msg"; @@ -194,7 +193,6 @@ pub enum FileType { DataTransferSessionIngestReport, InvalidDataTransferSessionIngestReport, ValidDataTransferSession, - PendingDataTransferSession, PriceReport, MobileRewardShare, SubscriberLocationReq, @@ -272,7 +270,6 @@ impl fmt::Display for FileType { INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT } Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION, - Self::PendingDataTransferSession => PENDING_DATA_TRANSFER_SESION, Self::PriceReport => PRICE_REPORT, Self::MobileRewardShare => MOBILE_REWARD_SHARE, Self::MapperMsg => MAPPER_MSG, @@ -347,7 +344,6 @@ impl FileType { INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT } Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION, - Self::PendingDataTransferSession => PENDING_DATA_TRANSFER_SESION, Self::PriceReport => PRICE_REPORT, Self::MobileRewardShare => MOBILE_REWARD_SHARE, Self::MapperMsg => MAPPER_MSG, @@ -422,7 +418,6 @@ impl FromStr for FileType { Self::InvalidDataTransferSessionIngestReport } VALID_DATA_TRANSFER_SESSION => Self::ValidDataTransferSession, - PENDING_DATA_TRANSFER_SESION => Self::PendingDataTransferSession, PRICE_REPORT => Self::PriceReport, MOBILE_REWARD_SHARE => Self::MobileRewardShare, MAPPER_MSG => Self::MapperMsg, diff --git a/file_store/src/mobile_session.rs b/file_store/src/mobile_session.rs index a0f5dc1f3..d5f1eda6c 100644 --- a/file_store/src/mobile_session.rs +++ b/file_store/src/mobile_session.rs @@ -8,9 +8,7 @@ use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ invalid_data_transfer_ingest_report_v1::DataTransferIngestReportStatus, DataTransferEvent as DataTransferEventProto, DataTransferRadioAccessTechnology, - DataTransferSessionIngestReportV1, DataTransferSessionReqV1, - DataTransferSessionSettlementStatus, InvalidDataTransferIngestReportV1, - PendingDataTransferSessionV1, + DataTransferSessionIngestReportV1, DataTransferSessionReqV1, InvalidDataTransferIngestReportV1, }; use serde::Serialize; @@ -183,7 +181,6 @@ pub struct DataTransferSessionReq { pub rewardable_bytes: u64, pub pub_key: PublicKeyBinary, pub signature: Vec, - pub status: DataTransferSessionSettlementStatus, } impl MsgDecode for DataTransferSessionReq { @@ -194,7 +191,6 @@ impl TryFrom for DataTransferSessionReq { type Error = Error; fn try_from(v: DataTransferSessionReqV1) -> Result { - let status = v.status(); Ok(Self { rewardable_bytes: v.rewardable_bytes, signature: v.signature, @@ -203,7 +199,6 @@ impl TryFrom for DataTransferSessionReq { .ok_or_else(|| Error::not_found("data transfer usage"))? .try_into()?, pub_key: v.pub_key.into(), - status, }) } } @@ -217,28 +212,7 @@ impl From for DataTransferSessionReqV1 { rewardable_bytes: v.rewardable_bytes, pub_key: v.pub_key.into(), signature: v.signature, - reward_cancelled: false, - status: v.status as i32, - } - } -} - -impl DataTransferSessionReq { - pub fn to_pending_proto( - self, - received_timestamp: DateTime, - ) -> PendingDataTransferSessionV1 { - let event_timestamp = self.data_transfer_usage.timestamp.encode_timestamp_millis(); - let received_timestamp = received_timestamp.encode_timestamp_millis(); - - PendingDataTransferSessionV1 { - pub_key: self.pub_key.into(), - payer: self.data_transfer_usage.payer.into(), - upload_bytes: self.data_transfer_usage.upload_bytes, - download_bytes: self.data_transfer_usage.download_bytes, - rewardable_bytes: self.rewardable_bytes, - event_timestamp, - received_timestamp, + ..Default::default() } } } diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index fae6108d1..baf598fc6 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -178,11 +178,6 @@ impl_file_sink!( FileType::OracleBoostingReport.to_str(), "oracle_boosting_report" ); -impl_file_sink!( - poc_mobile::PendingDataTransferSessionV1, - FileType::PendingDataTransferSession.to_str(), - "pending_data_transfer_session" -); impl_file_sink!( poc_mobile::RadioThresholdIngestReportV1, FileType::RadioThresholdIngestReport.to_str(), diff --git a/mobile_packet_verifier/Cargo.toml b/mobile_packet_verifier/Cargo.toml index 07bb91055..b936682dd 100644 --- a/mobile_packet_verifier/Cargo.toml +++ b/mobile_packet_verifier/Cargo.toml @@ -41,6 +41,3 @@ http-serde = { workspace = true } sha2 = { workspace = true } humantime-serde = { workspace = true } custom-tracing = { path = "../custom_tracing" } - -[dev-dependencies] -rand = { workspace = true } diff --git a/mobile_packet_verifier/migrations/7_pending_data_transfer_sessions.sql b/mobile_packet_verifier/migrations/7_pending_data_transfer_sessions.sql deleted file mode 100644 index 38df82101..000000000 --- a/mobile_packet_verifier/migrations/7_pending_data_transfer_sessions.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE pending_data_transfer_sessions ( - pub_key TEXT NOT NULL, - payer TEXT NOT NULL, - event_id TEXT NOT NULL, - uploaded_bytes BIGINT NOT NULL, - downloaded_bytes BIGINT NOT NULL, - rewardable_bytes BIGINT NOT NULL, - recv_timestamp TIMESTAMPTZ NOT NULL, - inserted_at TIMESTAMPTZ NOT NULL, - PRIMARY KEY(pub_key, payer, recv_timestamp) -); diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index db944e4a0..e00570b61 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -1,15 +1,14 @@ use chrono::{DateTime, Utc}; use file_store::file_sink::FileSinkClient; use file_store::mobile_session::{ - DataTransferEvent, DataTransferSessionIngestReport, InvalidDataTransferIngestReport, + DataTransferSessionIngestReport, InvalidDataTransferIngestReport, }; use futures::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::services::mobile_config::NetworkKeyRole; -use helium_proto::services::poc_mobile::DataTransferSessionSettlementStatus; use helium_proto::services::poc_mobile::{ invalid_data_transfer_ingest_report_v1::DataTransferIngestReportStatus, - InvalidDataTransferIngestReportV1, PendingDataTransferSessionV1, + InvalidDataTransferIngestReportV1, }; use mobile_config::client::{ authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver, @@ -23,7 +22,6 @@ pub async fn accumulate_sessions( authorization_verifier: &impl AuthorizationVerifier, conn: &mut Transaction<'_, Postgres>, invalid_data_session_report_sink: &FileSinkClient, - pending_data_session_report_sink: &FileSinkClient, curr_file_ts: DateTime, reports: impl Stream, ) -> anyhow::Result<()> { @@ -48,75 +46,9 @@ pub async fn accumulate_sessions( write_invalid_report(invalid_data_session_report_sink, report_validity, report).await?; continue; } - - let status = report.report.status; - match status { - DataTransferSessionSettlementStatus::Settled => { - save_settled_data_transfer_session( - conn, - report.report.data_transfer_usage, - report.report.rewardable_bytes, - curr_file_ts, - ) - .await?; - } - DataTransferSessionSettlementStatus::Pending => { - save_pending_data_transfer_session( - conn, - &report.report.data_transfer_usage, - report.report.rewardable_bytes, - curr_file_ts, - Utc::now(), - ) - .await?; - pending_data_session_report_sink - .write(report.report.to_pending_proto(curr_file_ts), []) - .await?; - } - } - } - - Ok(()) -} - -async fn save_pending_data_transfer_session( - conn: &mut Transaction<'_, Postgres>, - data_transfer_event: &DataTransferEvent, - rewardable_bytes: u64, - file_ts: DateTime, - curr_ts: DateTime, -) -> anyhow::Result<()> { - sqlx::query( - r#" - INSERT INTO pending_data_transfer_sessions (pub_key, event_id, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, recv_timestamp, inserted_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (pub_key, payer, recv_timestamp) DO UPDATE SET - uploaded_bytes = pending_data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes, - downloaded_bytes = pending_data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes, - rewardable_bytes = pending_data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes - "#, - ) - .bind(&data_transfer_event.pub_key) - .bind(&data_transfer_event.event_id) - .bind(&data_transfer_event.payer) - .bind(data_transfer_event.upload_bytes as i64) - .bind(data_transfer_event.download_bytes as i64) - .bind(rewardable_bytes as i64) - .bind(file_ts) - .bind(curr_ts) - .execute(conn) - .await?; - Ok(()) -} - -async fn save_settled_data_transfer_session( - conn: &mut Transaction<'_, Postgres>, - data_transfer_event: DataTransferEvent, - rewardable_bytes: u64, - curr_file_ts: DateTime, -) -> Result<(), anyhow::Error> { - sqlx::query( - r#" + let event = report.report.data_transfer_usage; + sqlx::query( + r#" INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) VALUES ($1, $2, $3, $4, $5, $6, $6) ON CONFLICT (pub_key, payer) DO UPDATE SET @@ -125,15 +57,17 @@ async fn save_settled_data_transfer_session( rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes, last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) "# - ) - .bind(data_transfer_event.pub_key) - .bind(data_transfer_event.payer) - .bind(data_transfer_event.upload_bytes as i64) - .bind(data_transfer_event.download_bytes as i64) - .bind(rewardable_bytes as i64) - .bind(curr_file_ts) - .execute(&mut *conn) - .await?; + ) + .bind(event.pub_key) + .bind(event.payer) + .bind(event.upload_bytes as i64) + .bind(event.download_bytes as i64) + .bind(report.report.rewardable_bytes as i64) + .bind(curr_file_ts) + .execute(&mut *conn) + .await?; + } + Ok(()) } @@ -213,127 +147,3 @@ async fn write_invalid_report( .await?; Ok(()) } - -#[cfg(test)] -mod tests { - - use file_store::mobile_session::DataTransferSessionReq; - use helium_crypto::{KeyTag, Keypair}; - use mobile_config::gateway_info::{DeviceType, GatewayInfo, GatewayInfoStream}; - use sqlx::PgPool; - - use super::*; - - #[derive(thiserror::Error, Debug)] - enum MockError {} - - #[derive(Clone)] - struct MockGatewayInfoResolver; - - #[async_trait::async_trait] - impl GatewayInfoResolver for MockGatewayInfoResolver { - type Error = MockError; - - async fn resolve_gateway_info( - &self, - address: &PublicKeyBinary, - ) -> Result, Self::Error> { - Ok(Some(GatewayInfo { - address: address.clone(), - metadata: None, - device_type: DeviceType::Cbrs, - })) - } - - async fn stream_gateways_info(&mut self) -> Result { - todo!() - } - } - - struct AllVerified; - - #[async_trait::async_trait] - impl AuthorizationVerifier for AllVerified { - type Error = MockError; - - async fn verify_authorized_key( - &self, - _pubkey: &PublicKeyBinary, - _role: helium_proto::services::mobile_config::NetworkKeyRole, - ) -> Result { - Ok(true) - } - } - - #[sqlx::test] - async fn pending_data_transfer_sessions_not_stored_for_burning( - pool: PgPool, - ) -> anyhow::Result<()> { - use rand::rngs::OsRng; - let radio_pubkey = Keypair::generate(KeyTag::default(), &mut OsRng); - let signing_pubkey = Keypair::generate(KeyTag::default(), &mut OsRng); - let payer_pubkey = Keypair::generate(KeyTag::default(), &mut OsRng); - - let reports = futures::stream::iter(vec![DataTransferSessionIngestReport { - received_timestamp: Utc::now(), - report: DataTransferSessionReq { - data_transfer_usage: DataTransferEvent { - pub_key: radio_pubkey.public_key().to_owned().into(), - upload_bytes: 50, - download_bytes: 50, - radio_access_technology: helium_proto::services::poc_mobile::DataTransferRadioAccessTechnology::default(), - event_id: "event-id".to_string(), - payer: payer_pubkey.public_key().to_owned().into(), - timestamp: Utc::now(), - signature: vec![] - }, - rewardable_bytes: 100, - pub_key: signing_pubkey.public_key().to_owned().into(), - signature: vec![], - status: DataTransferSessionSettlementStatus::Pending, - }, - }]); - - let (invalid_report_tx, mut invalid_report_rx) = tokio::sync::mpsc::channel(1); - let invalid_data_session_report_sink = FileSinkClient::new(invalid_report_tx, "testing"); - - let (pending_report_tx, mut pending_report_rx) = tokio::sync::mpsc::channel(1); - let pending_data_session_report_sink = FileSinkClient::new(pending_report_tx, "testing"); - - let mut conn = pool.begin().await?; - accumulate_sessions( - &MockGatewayInfoResolver, - &AllVerified, - &mut conn, - &invalid_data_session_report_sink, - &pending_data_session_report_sink, - Utc::now(), - reports, - ) - .await?; - conn.commit().await?; - - let pending_rows = sqlx::query("SELECT * FROM pending_data_transfer_sessions") - .fetch_all(&pool) - .await - .unwrap(); - assert_eq!(pending_rows.len(), 1); - - let settled_rows = sqlx::query("SELECT * FROM data_transfer_sessions") - .fetch_all(&pool) - .await - .unwrap(); - assert_eq!(settled_rows.len(), 0); - - assert!( - invalid_report_rx.try_recv().is_err(), - "expected invalid report sink to be empty" - ); - assert!( - pending_report_rx.try_recv().is_ok(), - "expected pending report sink to have a record" - ); - - Ok(()) - } -} diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 86d639a54..c526025c8 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -1,6 +1,4 @@ -use crate::{ - accumulate::accumulate_sessions, burner::Burner, event_ids::EventIdPurger, settings::Settings, -}; +use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings}; use anyhow::{bail, Result}; use chrono::{TimeZone, Utc}; use file_store::{ @@ -13,8 +11,7 @@ use file_store::{ }; use helium_proto::services::{ - packet_verifier::ValidDataTransferSession, - poc_mobile::{InvalidDataTransferIngestReportV1, PendingDataTransferSessionV1}, + packet_verifier::ValidDataTransferSession, poc_mobile::InvalidDataTransferIngestReportV1, }; use mobile_config::client::{ authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver, @@ -37,11 +34,9 @@ pub struct Daemon { gateway_info_resolver: GIR, authorization_verifier: AV, invalid_data_session_report_sink: FileSinkClient, - pending_data_session_report_sink: FileSinkClient, } impl Daemon { - #[allow(clippy::too_many_arguments)] pub fn new( settings: &Settings, pool: Pool, @@ -50,7 +45,6 @@ impl Daemon { gateway_info_resolver: GIR, authorization_verifier: AV, invalid_data_session_report_sink: FileSinkClient, - pending_data_session_report_sink: FileSinkClient, ) -> Self { Self { pool, @@ -61,7 +55,6 @@ impl Daemon { gateway_info_resolver, authorization_verifier, invalid_data_session_report_sink, - pending_data_session_report_sink, } } } @@ -95,8 +88,13 @@ where let Some(file) = file else { anyhow::bail!("FileInfoPoller sender was dropped unexpectedly"); }; - - self.process_file(file).await?; + tracing::info!("Verifying file: {}", file.file_info); + let ts = file.file_info.timestamp; + let mut transaction = self.pool.begin().await?; + let reports = file.into_stream(&mut transaction).await?; + crate::accumulate::accumulate_sessions(&self.gateway_info_resolver, &self.authorization_verifier, &mut transaction, &self.invalid_data_session_report_sink, ts, reports).await?; + transaction.commit().await?; + self.invalid_data_session_report_sink.commit().await?; }, _ = sleep_until(burn_time) => { // It's time to burn @@ -114,34 +112,6 @@ where } } } - - async fn process_file( - &self, - file: FileInfoStream, - ) -> Result<()> { - tracing::info!("Verifying file: {}", file.file_info); - - let ts = file.file_info.timestamp; - let mut transaction = self.pool.begin().await?; - let reports = file.into_stream(&mut transaction).await?; - - accumulate_sessions( - &self.gateway_info_resolver, - &self.authorization_verifier, - &mut transaction, - &self.invalid_data_session_report_sink, - &self.pending_data_session_report_sink, - ts, - reports, - ) - .await?; - - transaction.commit().await?; - self.invalid_data_session_report_sink.commit().await?; - self.pending_data_session_report_sink.commit().await?; - - Ok(()) - } } #[derive(Debug, clap::Args)] @@ -190,15 +160,6 @@ impl Cmd { ) .await?; - let (pending_sessions, pending_sessions_server) = PendingDataTransferSessionV1::file_sink( - store_base_path, - file_upload.clone(), - FileSinkCommitStrategy::Manual, - FileSinkRollTime::Default, - env!("CARGO_PKG_NAME"), - ) - .await?; - let burner = Burner::new(valid_sessions, solana); let file_store = FileStore::from_settings(&settings.ingest).await?; @@ -226,7 +187,6 @@ impl Cmd { gateway_client, auth_client, invalid_sessions, - pending_sessions, ); let event_id_purger = EventIdPurger::from_settings(pool, settings); @@ -235,7 +195,6 @@ impl Cmd { .add_task(file_upload_server) .add_task(valid_sessions_server) .add_task(invalid_sessions_server) - .add_task(pending_sessions_server) .add_task(reports_server) .add_task(event_id_purger) .add_task(daemon)