Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update locate command to support validated heartbeats and speedtest a… #660

Merged
merged 3 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions file_store/src/cli/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::{
heartbeat::CbrsHeartbeat, iot_beacon_report::IotBeaconIngestReport, iot_valid_poc::IotPoc,
iot_witness_report::IotWitnessIngestReport, speedtest::CellSpeedtest, traits::MsgDecode, Error,
FileInfoStream, FileStore, FileType, Result, Settings,
heartbeat::{cli::ValidatedHeartbeat, CbrsHeartbeat},
iot_beacon_report::IotBeaconIngestReport,
iot_valid_poc::IotPoc,
iot_witness_report::IotWitnessIngestReport,
speedtest::{cli::SpeedtestAverage, CellSpeedtest},
traits::MsgDecode,
Error, FileInfoStream, FileStore, FileType, Result, Settings,
};
use chrono::{NaiveDateTime, TimeZone, Utc};
use futures::{stream::TryStreamExt, StreamExt, TryFutureExt};
Expand Down Expand Up @@ -207,6 +211,12 @@ fn locate(prefix: &str, gateway: &PublicKey, buf: &[u8]) -> Result<Option<serde_
FileType::CbrsHeartbeat => {
CbrsHeartbeat::decode(buf).and_then(|event| event.to_value_if(pub_key))
}
FileType::ValidatedHeartbeat => {
ValidatedHeartbeat::decode(buf).and_then(|event| event.to_value_if(pub_key))
}
FileType::SpeedtestAvg => {
SpeedtestAverage::decode(buf).and_then(|event| event.to_value_if(pub_key))
}
FileType::CellSpeedtest => {
CellSpeedtest::decode(buf).and_then(|event| event.to_value_if(pub_key))
}
Expand Down Expand Up @@ -283,3 +293,15 @@ impl Gateway for IotPoc {
self.beacon_report.report.pub_key.as_ref() == pub_key
}
}

impl Gateway for ValidatedHeartbeat {
fn has_pubkey(&self, pub_key: &[u8]) -> bool {
self.pub_key.as_ref() == pub_key
}
}

impl Gateway for SpeedtestAverage {
fn has_pubkey(&self, pub_key: &[u8]) -> bool {
self.pub_key.as_ref() == pub_key
}
}
58 changes: 56 additions & 2 deletions file_store/src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::{
error::DecodeError,
traits::{MsgDecode, MsgTimestamp, TimestampDecode},
Error, Result,
};
use chrono::{DateTime, Utc};
use chrono::{DateTime, TimeZone, Utc};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{CellHeartbeatIngestReportV1, CellHeartbeatReqV1};
use helium_proto::services::poc_mobile::{
CellHeartbeatIngestReportV1, CellHeartbeatReqV1, CellType, Heartbeat, HeartbeatValidity,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -85,6 +88,57 @@ impl MsgTimestamp<Result<DateTime<Utc>>> for CellHeartbeatIngestReportV1 {
}
}

pub mod cli {
use super::*;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ValidatedHeartbeat {
pub cbsd_id: String,
pub pub_key: PublicKeyBinary,
pub reward_multiplier: f32,
pub timestamp: DateTime<Utc>,
pub cell_type: CellType,
pub validity: HeartbeatValidity,
pub lat: f64,
pub lon: f64,
pub coverage_object: Vec<u8>,
pub location_validation_timestamp: DateTime<Utc>,
pub distance_to_asserted: u64,
}

impl TryFrom<Heartbeat> for ValidatedHeartbeat {
type Error = Error;

fn try_from(v: Heartbeat) -> Result<Self> {
Ok(Self {
cbsd_id: v.cbsd_id.clone(),
pub_key: v.pub_key.clone().into(),
reward_multiplier: v.reward_multiplier,
timestamp: Utc
.timestamp_opt(v.timestamp as i64, 0)
.single()
.ok_or_else(|| DecodeError::invalid_timestamp(v.timestamp))?,
cell_type: v.cell_type(),
validity: v.validity(),
lat: v.lat,
lon: v.lon,
coverage_object: v.coverage_object,
location_validation_timestamp: Utc
.timestamp_opt(v.location_validation_timestamp as i64, 0)
.single()
.ok_or_else(|| {
DecodeError::invalid_timestamp(v.location_validation_timestamp)
})?,
distance_to_asserted: v.distance_to_asserted,
})
}
}

impl MsgDecode for ValidatedHeartbeat {
type Msg = Heartbeat;
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
75 changes: 73 additions & 2 deletions file_store/src/speedtest.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::{
error::DecodeError,
traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode},
Error, Result,
};
use chrono::{DateTime, Utc};
use chrono::{DateTime, TimeZone, Utc};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{SpeedtestIngestReportV1, SpeedtestReqV1};
use helium_proto::services::poc_mobile::{
Speedtest, SpeedtestAvg, SpeedtestAvgValidity, SpeedtestIngestReportV1, SpeedtestReqV1,
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Deserialize, Serialize, Debug)]
Expand Down Expand Up @@ -109,6 +112,74 @@ impl From<CellSpeedtestIngestReport> for SpeedtestIngestReportV1 {
}
}

pub mod cli {
use super::*;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SpeedtestAverageEntry {
pub upload_speed_bps: u64,
pub download_speed_bps: u64,
pub latency_ms: u32,
pub timestamp: DateTime<Utc>,
}

impl TryFrom<Speedtest> for SpeedtestAverageEntry {
type Error = Error;

fn try_from(v: Speedtest) -> Result<Self> {
Ok(Self {
upload_speed_bps: v.upload_speed_bps,
download_speed_bps: v.download_speed_bps,
latency_ms: v.latency_ms,
timestamp: Utc
.timestamp_opt(v.timestamp as i64, 0)
.single()
.ok_or_else(|| DecodeError::invalid_timestamp(v.timestamp))?,
})
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SpeedtestAverage {
pub pub_key: PublicKeyBinary,
pub upload_speed_avg_bps: u64,
pub download_speed_avg_bps: u64,
pub latency_avg_ms: u32,
pub validity: SpeedtestAvgValidity,
pub speedtests: Vec<SpeedtestAverageEntry>,
pub timestamp: DateTime<Utc>,
pub reward_multiplier: f32,
}

impl TryFrom<SpeedtestAvg> for SpeedtestAverage {
type Error = Error;

fn try_from(v: SpeedtestAvg) -> Result<Self> {
Ok(Self {
pub_key: v.pub_key.clone().into(),
upload_speed_avg_bps: v.upload_speed_avg_bps,
download_speed_avg_bps: v.download_speed_avg_bps,
latency_avg_ms: v.latency_avg_ms,
validity: v.validity(),
speedtests: v
.speedtests
.into_iter()
.map(SpeedtestAverageEntry::try_from)
.collect::<Result<Vec<_>>>()?,
timestamp: Utc
.timestamp_opt(v.timestamp as i64, 0)
.single()
.ok_or_else(|| DecodeError::invalid_timestamp(v.timestamp))?,
reward_multiplier: v.reward_multiplier,
})
}
}

impl MsgDecode for SpeedtestAverage {
type Msg = SpeedtestAvg;
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down