Skip to content

Commit

Permalink
Fix wifi rewards to calculate location trust score average
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Nov 22, 2023
1 parent e3e4523 commit 7b9bce5
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 45 deletions.
3 changes: 2 additions & 1 deletion mobile_verifier/src/cli/reward_from_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ impl Cmd {
let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?;

let heartbeats =
HeartbeatReward::validated(&pool, &epoch, settings.max_asserted_distance_deviation);
HeartbeatReward::validated(&pool, &epoch, settings.max_asserted_distance_deviation)
.await?;
let speedtest_averages =
SpeedtestAverages::aggregate_epoch_averages(epoch.end, &pool).await?;
let reward_shares = PocShares::aggregate(heartbeats, &speedtest_averages).await?;
Expand Down
55 changes: 46 additions & 9 deletions mobile_verifier/src/heartbeats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use helium_proto::services::poc_mobile as proto;
use retainer::Cache;
use rust_decimal::{prelude::ToPrimitive, Decimal};
use sqlx::{postgres::PgTypeInfo, Decode, Encode, Postgres, Transaction, Type};
use std::{ops::Range, pin::pin, time};
use std::{collections::HashMap, ops::Range, pin::pin, time};
use uuid::Uuid;

/// Minimum number of heartbeats required to give a reward to the hotspot.
Expand Down Expand Up @@ -256,17 +256,54 @@ impl HeartbeatReward {
self.reward_weight
}

pub fn validated<'a>(
pub async fn validated<'a>(
exec: impl sqlx::PgExecutor<'a> + Copy + 'a,
epoch: &'a Range<DateTime<Utc>>,
max_distance_to_asserted: u32,
) -> impl Stream<Item = Result<HeartbeatReward, sqlx::Error>> + 'a {
sqlx::query_as::<_, HeartbeatRow>(include_str!("valid_heartbeats.sql"))
.bind(epoch.start)
.bind(epoch.end)
.bind(MINIMUM_HEARTBEAT_COUNT)
.fetch(exec)
.map_ok(move |row| Self::from_heartbeat_row(row, max_distance_to_asserted))
) -> anyhow::Result<impl Stream<Item = HeartbeatReward> + 'a> {
let heartbeat_rows =
sqlx::query_as::<_, HeartbeatRow>(include_str!("valid_heartbeats.sql"))
.bind(epoch.start)
.bind(epoch.end)
.bind(MINIMUM_HEARTBEAT_COUNT)
.fetch(exec)
.try_fold(
HashMap::<(PublicKeyBinary, Option<String>), Vec<HeartbeatRow>>::new(),
|mut map, row| async move {
map.entry((row.hotspot_key.clone(), row.cbsd_id.clone()))
.or_default()
.push(row);

Ok(map)
},
)
.await?;

Ok(
futures::stream::iter(heartbeat_rows).map(move |((hotspot_key, cbsd_id), rows)| {
let first = rows.first().unwrap();
let average_location_trust_score = rows
.iter()
.map(|row| {
row.cell_type.location_weight(
row.location_validation_timestamp,
row.distance_to_asserted,
max_distance_to_asserted,
)
})
.sum::<Decimal>()
/ Decimal::new(rows.len() as i64, 0);

HeartbeatReward {
hotspot_key,
cell_type: first.cell_type,
cbsd_id,
reward_weight: first.cell_type.reward_weight() * average_location_trust_score,
coverage_object: first.coverage_object,
latest_timestamp: first.latest_timestamp,
}
}),
)
}

pub fn from_heartbeat_row(value: HeartbeatRow, max_distance_to_asserted: u32) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion mobile_verifier/src/heartbeats/valid_heartbeats.sql
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ GROUP BY
cbrs_coverage_objs.latest_timestamp
HAVING
count(*) >= $3
UNION
UNION ALL
SELECT
wifi_grouped.hotspot_key,
NULL AS cbsd_id,
Expand Down
44 changes: 20 additions & 24 deletions mobile_verifier/src/reward_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,12 @@ pub struct PocShares {

impl PocShares {
pub async fn aggregate(
heartbeat_rewards: impl Stream<Item = Result<HeartbeatReward, sqlx::Error>>,
heartbeat_rewards: impl Stream<Item = HeartbeatReward>,
speedtest_averages: &SpeedtestAverages,
) -> anyhow::Result<Self> {
let mut poc_shares = Self::default();
let mut heartbeat_rewards = std::pin::pin!(heartbeat_rewards);
while let Some(heartbeat_reward) = heartbeat_rewards.next().await.transpose()? {
while let Some(heartbeat_reward) = heartbeat_rewards.next().await {
let speedmultiplier = speedtest_averages
.get_average(&heartbeat_reward.hotspot_key)
.as_ref()
Expand Down Expand Up @@ -705,10 +705,9 @@ mod test {
averages.insert(g5.clone(), g5_average);
let speedtest_avgs = SpeedtestAverages { averages };

let rewards =
PocShares::aggregate(stream::iter(heartbeat_rewards).map(Ok), &speedtest_avgs)
.await
.unwrap();
let rewards = PocShares::aggregate(stream::iter(heartbeat_rewards), &speedtest_avgs)
.await
.unwrap();

let gw1_shares = rewards
.hotspot_shares
Expand Down Expand Up @@ -1077,12 +1076,11 @@ mod test {
let mut owner_rewards = HashMap::<PublicKeyBinary, u64>::new();
let duration = Duration::hours(1);
let epoch = (now - duration)..now;
for mobile_reward in
PocShares::aggregate(stream::iter(heartbeat_rewards).map(Ok), &speedtest_avgs)
.await
.unwrap()
.into_rewards(Decimal::ZERO, &epoch)
.unwrap()
for mobile_reward in PocShares::aggregate(stream::iter(heartbeat_rewards), &speedtest_avgs)
.await
.unwrap()
.into_rewards(Decimal::ZERO, &epoch)
.unwrap()
{
let radio_reward = match mobile_reward.reward {
Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward,
Expand Down Expand Up @@ -1244,12 +1242,11 @@ mod test {
let mut owner_rewards = HashMap::<PublicKeyBinary, u64>::new();
let duration = Duration::hours(1);
let epoch = (now - duration)..now;
for mobile_reward in
PocShares::aggregate(stream::iter(heartbeat_rewards).map(Ok), &speedtest_avgs)
.await
.unwrap()
.into_rewards(Decimal::ZERO, &epoch)
.unwrap()
for mobile_reward in PocShares::aggregate(stream::iter(heartbeat_rewards), &speedtest_avgs)
.await
.unwrap()
.into_rewards(Decimal::ZERO, &epoch)
.unwrap()
{
let radio_reward = match mobile_reward.reward {
Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward,
Expand Down Expand Up @@ -1364,12 +1361,11 @@ mod test {
let mut owner_rewards = HashMap::<PublicKeyBinary, u64>::new();
let duration = Duration::hours(1);
let epoch = (now - duration)..now;
for mobile_reward in
PocShares::aggregate(stream::iter(heartbeat_rewards).map(Ok), &speedtest_avgs)
.await
.unwrap()
.into_rewards(Decimal::ZERO, &epoch)
.unwrap()
for mobile_reward in PocShares::aggregate(stream::iter(heartbeat_rewards), &speedtest_avgs)
.await
.unwrap()
.into_rewards(Decimal::ZERO, &epoch)
.unwrap()
{
let radio_reward = match mobile_reward.reward {
Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward,
Expand Down
3 changes: 2 additions & 1 deletion mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ impl Rewarder {
);

let heartbeats =
HeartbeatReward::validated(&self.pool, reward_period, self.max_distance_to_asserted);
HeartbeatReward::validated(&self.pool, reward_period, self.max_distance_to_asserted)
.await?;
let speedtest_averages =
SpeedtestAverages::aggregate_epoch_averages(reward_period.end, &self.pool).await?;
let poc_rewards = PocShares::aggregate(heartbeats, &speedtest_averages).await?;
Expand Down
81 changes: 72 additions & 9 deletions mobile_verifier/tests/heartbeats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use futures_util::StreamExt;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::HeartbeatValidity;
use mobile_verifier::cell_type::CellType;
Expand Down Expand Up @@ -140,8 +140,9 @@ VALUES
&(start_period..end_period),
max_asserted_distance_deviation,
)
.try_collect()
.await?;
.await?
.collect()
.await;

assert_eq!(
heartbeat_reward,
Expand Down Expand Up @@ -202,8 +203,9 @@ VALUES
&(start_period..end_period),
max_asserted_distance_deviation,
)
.try_collect()
.await?;
.await?
.collect()
.await;

assert_eq!(
heartbeat_reward,
Expand Down Expand Up @@ -259,8 +261,9 @@ VALUES
&(start_period..end_period),
max_asserted_distance_deviation,
)
.try_collect()
.await?;
.await?
.collect()
.await;

assert!(heartbeat_reward.is_empty());

Expand Down Expand Up @@ -307,8 +310,9 @@ VALUES
&(start_period..end_period),
max_asserted_distance_deviation,
)
.try_collect()
.await?;
.await?
.collect()
.await;

assert_eq!(
heartbeat_reward,
Expand All @@ -324,3 +328,62 @@ VALUES

Ok(())
}

#[sqlx::test]
#[ignore]
async fn ensure_wifi_hotspots_use_average_location_trust_score(pool: PgPool) -> anyhow::Result<()> {
let early_coverage_object = Uuid::new_v4();
let latest_coverage_object = Uuid::new_v4();
let hotspot: PublicKeyBinary =
"112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?;
sqlx::query(
r#"
INSERT INTO wifi_heartbeats (hotspot_key, cell_type, latest_timestamp, truncated_timestamp, location_validation_timestamp, distance_to_asserted, coverage_object)
VALUES
($1, 'novagenericwifiindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00', NOW(), 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00', NOW(), 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 02:00:00+00', '2023-08-25 02:00:00+00', NOW(), 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 03:00:00+00', '2023-08-25 03:00:00+00', NOW(), 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 04:00:00+00', '2023-08-25 04:00:00+00', NOW(), 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 05:00:00+00', '2023-08-25 05:00:00+00', NOW(), 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 06:00:00+00', '2023-08-25 06:00:00+00', NOW(), 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 07:00:00+00', '2023-08-25 07:00:00+00', NOW(), 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 08:00:00+00', '2023-08-25 08:00:00+00', null, 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 09:00:00+00', '2023-08-25 09:00:00+00', null, 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 10:00:00+00', '2023-08-25 10:00:00+00', null, 300, $2),
($1, 'novagenericwifiindoor', '2023-08-25 11:00:00+00', '2023-08-25 11:00:00+00', null, 300, $3)
"#,
)
.bind(&hotspot)
.bind(early_coverage_object)
.bind(latest_coverage_object)
.execute(&pool)
.await?;

let start_period: DateTime<Utc> = "2023-08-25 00:00:00.000000000 UTC".parse()?;
let end_period: DateTime<Utc> = "2023-08-26 00:00:00.000000000 UTC".parse()?;
let latest_timestamp: DateTime<Utc> = "2023-08-25 11:00:00.000000000 UTC".parse()?;
let max_asserted_distance_deviation: u32 = 300;
let heartbeat_reward: Vec<_> = HeartbeatReward::validated(
&pool,
&(start_period..end_period),
max_asserted_distance_deviation,
)
.await?
.collect()
.await;

assert_eq!(
heartbeat_reward,
vec![HeartbeatReward {
hotspot_key: hotspot,
cell_type: CellType::NovaGenericWifiIndoor,
cbsd_id: None,
reward_weight: dec!(0.3),
latest_timestamp,
coverage_object: Some(latest_coverage_object),
}]
);

Ok(())
}

0 comments on commit 7b9bce5

Please sign in to comment.