Skip to content

Commit

Permalink
support hex and hotspot usage reports at filestore and ingestor
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Oct 11, 2024
1 parent 5d16871 commit 35bf4d7
Show file tree
Hide file tree
Showing 12 changed files with 753 additions and 93 deletions.
121 changes: 69 additions & 52 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch =
hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [
"disktree",
] }
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
helium-proto = { git = "https://github.com/helium/proto", branch = "andymck/hip-118-visualization-data", features = [
"services",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "andymck/hip-118-visualization-data" }
solana-client = "1.18"
solana-sdk = "1.18"
solana-program = "1.18"
Expand Down
21 changes: 21 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ impl FileInfo {
}
}

pub const HEX_USAGE_COUNTS_INGEST_REPORT: &str = "hex_usage_counts_ingest_report";
pub const RADIO_USAGE_COUNTS_INGEST_REPORT: &str = "radio_usage_counts_ingest_report";
pub const HEX_USAGE_COUNTS_REQ: &str = "hex_usage_counts_req";
pub const RADIO_USAGE_COUNTS_REQ: &str = "radio_usage_counts_req";

pub const INVALIDATED_RADIO_THRESHOLD_REQ: &str = "invalidated_radio_threshold_req";
pub const INVALIDATED_RADIO_THRESHOLD_INGEST_REPORT: &str =
"invalidated_radio_threshold_ingest_report";
Expand Down Expand Up @@ -228,6 +233,10 @@ pub enum FileType {
PromotionRewardIngestReport,
VerifiedPromotionReward,
ServiceProviderPromotionFund,
HexUsageCountsIngestReport,
RadioUsageCountsIngestReport,
HexUsageCountsReq,
RadioUsageCountsReq,
}

impl fmt::Display for FileType {
Expand Down Expand Up @@ -303,6 +312,10 @@ impl fmt::Display for FileType {
Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD,
Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND,
Self::HexUsageCountsIngestReport => HEX_USAGE_COUNTS_INGEST_REPORT,
Self::RadioUsageCountsIngestReport => RADIO_USAGE_COUNTS_INGEST_REPORT,
Self::HexUsageCountsReq => HEX_USAGE_COUNTS_REQ,
Self::RadioUsageCountsReq => RADIO_USAGE_COUNTS_REQ,
};
f.write_str(s)
}
Expand Down Expand Up @@ -381,6 +394,10 @@ impl FileType {
Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD,
Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND,
Self::HexUsageCountsIngestReport => HEX_USAGE_COUNTS_INGEST_REPORT,
Self::RadioUsageCountsIngestReport => RADIO_USAGE_COUNTS_INGEST_REPORT,
Self::HexUsageCountsReq => HEX_USAGE_COUNTS_REQ,
Self::RadioUsageCountsReq => RADIO_USAGE_COUNTS_REQ,
}
}
}
Expand Down Expand Up @@ -458,6 +475,10 @@ impl FromStr for FileType {
PROMOTION_REWARD_INGEST_REPORT => Self::PromotionRewardIngestReport,
VERIFIED_PROMOTION_REWARD => Self::VerifiedPromotionReward,
SERVICE_PROVIDER_PROMOTION_FUND => Self::ServiceProviderPromotionFund,
HEX_USAGE_COUNTS_INGEST_REPORT => Self::HexUsageCountsIngestReport,
RADIO_USAGE_COUNTS_INGEST_REPORT => Self::RadioUsageCountsIngestReport,
HEX_USAGE_COUNTS_REQ => Self::HexUsageCountsReq,
RADIO_USAGE_COUNTS_REQ => Self::RadioUsageCountsReq,
_ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))),
};
Ok(result)
Expand Down
1 change: 1 addition & 0 deletions file_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod speedtest;
pub mod subscriber_verified_mapping_event;
pub mod subscriber_verified_mapping_event_ingest_report;
pub mod traits;
pub mod usage_counts;
pub mod verified_subscriber_verified_mapping_event_ingest_report;
pub mod wifi_heartbeat;

Expand Down
10 changes: 10 additions & 0 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,16 @@ impl_file_sink!(
FileType::WifiHeartbeatIngestReport.to_str(),
"wifi_heartbeat_report"
);
impl_file_sink!(
poc_mobile::HexUsageCountsIngestReportV1,
FileType::HexUsageCountsIngestReport.to_str(),
"hex_usage_counts_ingest_report"
);
impl_file_sink!(
poc_mobile::RadioUsageCountsIngestReportV1,
FileType::RadioUsageCountsIngestReport.to_str(),
"hotspot_usage_counts_ingest_report"
);
impl_file_sink!(
proto::BoostedHexUpdateV1,
FileType::BoostedHexUpdate.to_str(),
Expand Down
2 changes: 2 additions & 0 deletions file_store/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature);
impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature);
impl_msg_verify!(poc_mobile::PromotionRewardReqV1, signature);
impl_msg_verify!(poc_mobile::HexUsageCountsReqV1, signature);
impl_msg_verify!(poc_mobile::RadioUsageCountsReqV1, signature);

#[cfg(test)]
mod test {
Expand Down
230 changes: 230 additions & 0 deletions file_store/src/usage_counts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use std::convert::TryFrom;

use crate::{
error::DecodeError,
traits::{MsgDecode, MsgTimestamp, TimestampDecode, TimestampEncode},
Error, Result,
};
use chrono::{DateTime, Utc};
use h3o::CellIndex;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
HexUsageCountsIngestReportV1, HexUsageCountsReqV1, RadioUsageCountsIngestReportV1,
RadioUsageCountsReqV1,
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct HexUsageCountsReq {
pub hex: CellIndex,
pub helium_mobile_subscriber_avg_count: u64,
pub helium_mobile_disco_mapping_avg_count: u64,
pub offload_avg_count: u64,
pub tmo_cell_avg_count: u64,
pub timestamp: DateTime<Utc>,
pub carrier_mapping_key: PublicKeyBinary,
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct RadioUsageCountsReq {
pub hotspot_pubkey: PublicKeyBinary,
pub cbsd_id: String,
pub helium_mobile_subscriber_avg_count: u64,
pub helium_mobile_disco_mapping_avg_count: u64,
pub offload_avg_count: u64,
pub timestamp: DateTime<Utc>,
pub carrier_mapping_key: PublicKeyBinary,
}

impl MsgDecode for HexUsageCountsReq {
type Msg = HexUsageCountsReqV1;
}

impl MsgDecode for RadioUsageCountsReq {
type Msg = RadioUsageCountsReqV1;
}

impl MsgTimestamp<Result<DateTime<Utc>>> for HexUsageCountsReqV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.timestamp.to_timestamp()
}
}

impl MsgTimestamp<u64> for HexUsageCountsReq {
fn timestamp(&self) -> u64 {
self.timestamp.encode_timestamp()
}
}

impl MsgTimestamp<Result<DateTime<Utc>>> for RadioUsageCountsReqV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.timestamp.to_timestamp()
}
}

impl MsgTimestamp<u64> for RadioUsageCountsReq {
fn timestamp(&self) -> u64 {
self.timestamp.encode_timestamp()
}
}

impl TryFrom<HexUsageCountsReqV1> for HexUsageCountsReq {
type Error = Error;
fn try_from(v: HexUsageCountsReqV1) -> Result<Self> {
let timestamp = v.timestamp()?;
let hex = CellIndex::try_from(v.hex).map_err(|_| {
DecodeError::FileStreamTryDecode(format!("invalid CellIndex {}", v.hex))
})?;
Ok(Self {
hex,
helium_mobile_subscriber_avg_count: v.helium_mobile_subscriber_avg_count,
helium_mobile_disco_mapping_avg_count: v.helium_mobile_disco_mapping_avg_count,
offload_avg_count: v.offload_avg_count,
tmo_cell_avg_count: v.tmo_cell_avg_count,
timestamp,
carrier_mapping_key: v.carrier_mapping_key.into(),
})
}
}

impl From<HexUsageCountsReq> for HexUsageCountsReqV1 {
fn from(v: HexUsageCountsReq) -> Self {
let timestamp = v.timestamp();
HexUsageCountsReqV1 {
hex: v.hex.into(),
helium_mobile_subscriber_avg_count: v.helium_mobile_subscriber_avg_count,
helium_mobile_disco_mapping_avg_count: v.helium_mobile_disco_mapping_avg_count,
offload_avg_count: v.offload_avg_count,
tmo_cell_avg_count: v.tmo_cell_avg_count,
timestamp,
carrier_mapping_key: v.carrier_mapping_key.into(),
signature: vec![],
}
}
}

impl TryFrom<RadioUsageCountsReqV1> for RadioUsageCountsReq {
type Error = Error;
fn try_from(v: RadioUsageCountsReqV1) -> Result<Self> {
let timestamp = v.timestamp()?;
Ok(Self {
hotspot_pubkey: v.hotspot_pubkey.into(),
cbsd_id: v.cbsd_id,
helium_mobile_subscriber_avg_count: v.helium_mobile_subscriber_avg_count,
helium_mobile_disco_mapping_avg_count: v.helium_mobile_disco_mapping_avg_count,
offload_avg_count: v.offload_avg_count,
timestamp,
carrier_mapping_key: v.carrier_mapping_key.into(),
})
}
}

impl From<RadioUsageCountsReq> for RadioUsageCountsReqV1 {
fn from(v: RadioUsageCountsReq) -> Self {
let timestamp = v.timestamp();
RadioUsageCountsReqV1 {
hotspot_pubkey: v.hotspot_pubkey.into(),
cbsd_id: v.cbsd_id,
helium_mobile_subscriber_avg_count: v.helium_mobile_subscriber_avg_count,
helium_mobile_disco_mapping_avg_count: v.helium_mobile_disco_mapping_avg_count,
offload_avg_count: v.offload_avg_count,
timestamp,
carrier_mapping_key: v.carrier_mapping_key.into(),
signature: vec![],
}
}
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct HexUsageCountsIngestReport {
pub report: HexUsageCountsReq,
pub received_timestamp: DateTime<Utc>,
}

#[derive(Clone, Deserialize, Serialize, Debug, PartialEq)]
pub struct RadioUsageCountsIngestReport {
pub report: RadioUsageCountsReq,
pub received_timestamp: DateTime<Utc>,
}

impl MsgDecode for HexUsageCountsIngestReport {
type Msg = HexUsageCountsIngestReportV1;
}

impl MsgDecode for RadioUsageCountsIngestReport {
type Msg = RadioUsageCountsIngestReportV1;
}

impl MsgTimestamp<Result<DateTime<Utc>>> for HexUsageCountsIngestReportV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.received_timestamp.to_timestamp()
}
}

impl MsgTimestamp<u64> for HexUsageCountsIngestReport {
fn timestamp(&self) -> u64 {
self.received_timestamp.encode_timestamp()
}
}

impl MsgTimestamp<Result<DateTime<Utc>>> for RadioUsageCountsIngestReportV1 {
fn timestamp(&self) -> Result<DateTime<Utc>> {
self.received_timestamp.to_timestamp()
}
}

impl MsgTimestamp<u64> for RadioUsageCountsIngestReport {
fn timestamp(&self) -> u64 {
self.received_timestamp.encode_timestamp()
}
}

impl TryFrom<HexUsageCountsIngestReportV1> for HexUsageCountsIngestReport {
type Error = Error;
fn try_from(v: HexUsageCountsIngestReportV1) -> Result<Self> {
Ok(Self {
report: v
.clone()
.report
.ok_or_else(|| Error::not_found("ingest HexUsageCountsIngestReport report"))?
.try_into()?,
received_timestamp: v.timestamp()?,
})
}
}

impl From<HexUsageCountsIngestReport> for HexUsageCountsIngestReportV1 {
fn from(v: HexUsageCountsIngestReport) -> Self {
let received_timestamp = v.received_timestamp;
let report: HexUsageCountsReqV1 = v.report.into();
Self {
report: Some(report),
received_timestamp: received_timestamp.encode_timestamp(),
}
}
}

impl TryFrom<RadioUsageCountsIngestReportV1> for RadioUsageCountsIngestReport {
type Error = Error;
fn try_from(v: RadioUsageCountsIngestReportV1) -> Result<Self> {
Ok(Self {
report: v
.clone()
.report
.ok_or_else(|| Error::not_found("ingest RadioUsageCountsIngestReport report"))?
.try_into()?,
received_timestamp: v.timestamp()?,
})
}
}

impl From<RadioUsageCountsIngestReport> for RadioUsageCountsIngestReportV1 {
fn from(v: RadioUsageCountsIngestReport) -> Self {
let received_timestamp = v.received_timestamp;
let report: RadioUsageCountsReqV1 = v.report.into();
Self {
report: Some(report),
received_timestamp: received_timestamp.encode_timestamp(),
}
}
}
1 change: 1 addition & 0 deletions ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ file-store = { path = "../file_store" }
poc-metrics = { path = "../metrics" }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
mobile-config = { path = "../mobile_config" }
task-manager = { path = "../task_manager" }
rand = { workspace = true }
custom-tracing = { path = "../custom_tracing", features = ["grpc"] }
Expand Down
Loading

0 comments on commit 35bf4d7

Please sign in to comment.