diff --git a/file_store/src/cli/bucket.rs b/file_store/src/cli/bucket.rs index 2e950ef3f..8b4cca9bd 100644 --- a/file_store/src/cli/bucket.rs +++ b/file_store/src/cli/bucket.rs @@ -1,7 +1,11 @@ use crate::{ - heartbeat::CbrsHeartbeat, iot_beacon_report::IotBeaconIngestReport, iot_valid_poc::IotPoc, - iot_witness_report::IotWitnessIngestReport, speedtest::CellSpeedtest, traits::MsgDecode, Error, - FileInfoStream, FileStore, FileType, Result, Settings, + heartbeat::{cli::ValidatedHeartbeat, CbrsHeartbeat}, + iot_beacon_report::IotBeaconIngestReport, + iot_valid_poc::IotPoc, + iot_witness_report::IotWitnessIngestReport, + speedtest::{cli::SpeedtestAverage, CellSpeedtest}, + traits::MsgDecode, + Error, FileInfoStream, FileStore, FileType, Result, Settings, }; use chrono::{NaiveDateTime, TimeZone, Utc}; use futures::{stream::TryStreamExt, StreamExt, TryFutureExt}; @@ -207,6 +211,12 @@ fn locate(prefix: &str, gateway: &PublicKey, buf: &[u8]) -> Result { CbrsHeartbeat::decode(buf).and_then(|event| event.to_value_if(pub_key)) } + FileType::ValidatedHeartbeat => { + ValidatedHeartbeat::decode(buf).and_then(|event| event.to_value_if(pub_key)) + } + FileType::SpeedtestAvg => { + SpeedtestAverage::decode(buf).and_then(|event| event.to_value_if(pub_key)) + } FileType::CellSpeedtest => { CellSpeedtest::decode(buf).and_then(|event| event.to_value_if(pub_key)) } @@ -283,3 +293,15 @@ impl Gateway for IotPoc { self.beacon_report.report.pub_key.as_ref() == pub_key } } + +impl Gateway for ValidatedHeartbeat { + fn has_pubkey(&self, pub_key: &[u8]) -> bool { + self.pub_key.as_ref() == pub_key + } +} + +impl Gateway for SpeedtestAverage { + fn has_pubkey(&self, pub_key: &[u8]) -> bool { + self.pub_key.as_ref() == pub_key + } +} diff --git a/file_store/src/heartbeat.rs b/file_store/src/heartbeat.rs index a4c1435e2..11510c39f 100644 --- a/file_store/src/heartbeat.rs +++ b/file_store/src/heartbeat.rs @@ -1,10 +1,13 @@ use crate::{ + error::DecodeError, traits::{MsgDecode, MsgTimestamp, TimestampDecode}, Error, Result, }; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_mobile::{CellHeartbeatIngestReportV1, CellHeartbeatReqV1}; +use helium_proto::services::poc_mobile::{ + CellHeartbeatIngestReportV1, CellHeartbeatReqV1, CellType, Heartbeat, HeartbeatValidity, +}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -85,6 +88,57 @@ impl MsgTimestamp>> for CellHeartbeatIngestReportV1 { } } +pub mod cli { + use super::*; + + #[derive(Serialize, Deserialize, Debug, Clone)] + pub struct ValidatedHeartbeat { + pub cbsd_id: String, + pub pub_key: PublicKeyBinary, + pub reward_multiplier: f32, + pub timestamp: DateTime, + pub cell_type: CellType, + pub validity: HeartbeatValidity, + pub lat: f64, + pub lon: f64, + pub coverage_object: Vec, + pub location_validation_timestamp: DateTime, + pub distance_to_asserted: u64, + } + + impl TryFrom for ValidatedHeartbeat { + type Error = Error; + + fn try_from(v: Heartbeat) -> Result { + Ok(Self { + cbsd_id: v.cbsd_id.clone(), + pub_key: v.pub_key.clone().into(), + reward_multiplier: v.reward_multiplier, + timestamp: Utc + .timestamp_opt(v.timestamp as i64, 0) + .single() + .ok_or_else(|| DecodeError::invalid_timestamp(v.timestamp))?, + cell_type: v.cell_type(), + validity: v.validity(), + lat: v.lat, + lon: v.lon, + coverage_object: v.coverage_object, + location_validation_timestamp: Utc + .timestamp_opt(v.location_validation_timestamp as i64, 0) + .single() + .ok_or_else(|| { + DecodeError::invalid_timestamp(v.location_validation_timestamp) + })?, + distance_to_asserted: v.distance_to_asserted, + }) + } + } + + impl MsgDecode for ValidatedHeartbeat { + type Msg = Heartbeat; + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/file_store/src/speedtest.rs b/file_store/src/speedtest.rs index 29a64b558..2c8fce4a6 100644 --- a/file_store/src/speedtest.rs +++ b/file_store/src/speedtest.rs @@ -1,10 +1,13 @@ use crate::{ + error::DecodeError, traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode}, Error, Result, }; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_mobile::{SpeedtestIngestReportV1, SpeedtestReqV1}; +use helium_proto::services::poc_mobile::{ + Speedtest, SpeedtestAvg, SpeedtestAvgValidity, SpeedtestIngestReportV1, SpeedtestReqV1, +}; use serde::{Deserialize, Serialize}; #[derive(Clone, Deserialize, Serialize, Debug)] @@ -109,6 +112,74 @@ impl From for SpeedtestIngestReportV1 { } } +pub mod cli { + use super::*; + + #[derive(Serialize, Deserialize, Debug, Clone)] + pub struct SpeedtestAverageEntry { + pub upload_speed_bps: u64, + pub download_speed_bps: u64, + pub latency_ms: u32, + pub timestamp: DateTime, + } + + impl TryFrom for SpeedtestAverageEntry { + type Error = Error; + + fn try_from(v: Speedtest) -> Result { + Ok(Self { + upload_speed_bps: v.upload_speed_bps, + download_speed_bps: v.download_speed_bps, + latency_ms: v.latency_ms, + timestamp: Utc + .timestamp_opt(v.timestamp as i64, 0) + .single() + .ok_or_else(|| DecodeError::invalid_timestamp(v.timestamp))?, + }) + } + } + + #[derive(Serialize, Deserialize, Debug, Clone)] + pub struct SpeedtestAverage { + pub pub_key: PublicKeyBinary, + pub upload_speed_avg_bps: u64, + pub download_speed_avg_bps: u64, + pub latency_avg_ms: u32, + pub validity: SpeedtestAvgValidity, + pub speedtests: Vec, + pub timestamp: DateTime, + pub reward_multiplier: f32, + } + + impl TryFrom for SpeedtestAverage { + type Error = Error; + + fn try_from(v: SpeedtestAvg) -> Result { + Ok(Self { + pub_key: v.pub_key.clone().into(), + upload_speed_avg_bps: v.upload_speed_avg_bps, + download_speed_avg_bps: v.download_speed_avg_bps, + latency_avg_ms: v.latency_avg_ms, + validity: v.validity(), + speedtests: v + .speedtests + .into_iter() + .map(SpeedtestAverageEntry::try_from) + .collect::>>()?, + timestamp: Utc + .timestamp_opt(v.timestamp as i64, 0) + .single() + .ok_or_else(|| DecodeError::invalid_timestamp(v.timestamp))?, + reward_multiplier: v.reward_multiplier, + }) + } + } + + impl MsgDecode for SpeedtestAverage { + type Msg = SpeedtestAvg; + } +} + #[cfg(test)] mod tests { use super::*;