Skip to content

Commit

Permalink
refactor speedtests handling, ensure rewards only include tests up un…
Browse files Browse the repository at this point in the history
…til epoch end
  • Loading branch information
andymck committed Aug 16, 2023
1 parent fd07b23 commit 65ac933
Show file tree
Hide file tree
Showing 10 changed files with 751 additions and 696 deletions.
22 changes: 22 additions & 0 deletions mobile_verifier/migrations/15_speedtests_one_to_one.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

CREATE TABLE speedtests_migration (
pubkey text NOT NULL,
upload_speed bigint,
download_speed bigint,
latency integer,
serial text,
timestamp timestamptz NOT NULL,
inserted_at timestamptz default now(),
PRIMARY KEY(pubkey, timestamp)
);
CREATE INDEX idx_speedtests_pubkey on speedtests_migration (pubkey);

INSERT INTO speedtests_migration (pubkey, upload_speed, download_speed, latency, serial, timestamp)
SELECT id, (st).upload_speed, (st).download_speed, (st).latency, '', (st).timestamp
FROM (select id, unnest(speedtests) as st from speedtests) as tmp
ON CONFLICT DO NOTHING;

ALTER TABLE speedtests RENAME TO speedtests_old;
ALTER TABLE speedtests_migration RENAME TO speedtests;


15 changes: 8 additions & 7 deletions mobile_verifier/src/cli/reward_from_db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
heartbeats::HeartbeatReward,
reward_shares::{get_scheduled_tokens_for_poc_and_dc, PocShares},
speedtests::{Average, SpeedtestAverages},
speedtests_average::SpeedtestAverages,
Settings,
};
use anyhow::Result;
Expand Down Expand Up @@ -39,8 +39,9 @@ impl Cmd {
.await?;

let heartbeats = HeartbeatReward::validated(&pool, &epoch);
let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?;
let reward_shares = PocShares::aggregate(heartbeats, speedtests.clone()).await?;
let speedtest_averages =
SpeedtestAverages::aggregate_epoch_averages(epoch.end, &pool).await?;
let reward_shares = PocShares::aggregate(heartbeats, &speedtest_averages).await?;

let mut total_rewards = 0_u64;
let mut owner_rewards = HashMap::<_, u64>::new();
Expand All @@ -62,11 +63,11 @@ impl Cmd {
}
let rewards: Vec<_> = owner_rewards.into_iter().collect();
let mut multiplier_count = HashMap::<_, usize>::new();
let speedtest_multipliers: Vec<_> = speedtests
.speedtests
let speedtest_multipliers: Vec<_> = speedtest_averages
.averages
.into_iter()
.map(|(pub_key, avg)| {
let reward_multiplier = Average::from(&avg).reward_multiplier();
.map(|(pub_key, average)| {
let reward_multiplier = average.reward_multiplier;
*multiplier_count.entry(reward_multiplier).or_default() += 1;
(pub_key, reward_multiplier)
})
Expand Down
4 changes: 2 additions & 2 deletions mobile_verifier/src/data_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ pub async fn data_sessions_to_dc<'a>(

pub async fn clear_hotspot_data_sessions(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
reward_period: &Range<DateTime<Utc>>,
timestamp: &DateTime<Utc>,
) -> Result<(), sqlx::Error> {
sqlx::query("delete from hotspot_data_transfer_sessions where received_timestamp < $1")
.bind(reward_period.end)
.bind(timestamp)
.execute(&mut *tx)
.await?;
Ok(())
Expand Down
11 changes: 11 additions & 0 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,14 @@ async fn validate_heartbeat(

Ok((cell_type, proto::HeartbeatValidity::Valid))
}

pub async fn clear_heartbeats(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
timestamp: &DateTime<Utc>,
) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM heartbeats WHERE truncated_timestamp < $1")
.bind(timestamp)
.execute(&mut *tx)
.await?;
Ok(())
}
1 change: 1 addition & 0 deletions mobile_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod heartbeats;
mod reward_shares;
mod settings;
mod speedtests;
mod speedtests_average;
mod subscriber_location;
mod telemetry;

Expand Down
155 changes: 93 additions & 62 deletions mobile_verifier/src/reward_shares.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::speedtests_average::SpeedtestAverages;
use crate::{
data_session::HotspotMap,
heartbeats::HeartbeatReward,
speedtests::{Average, SpeedtestAverages},
data_session::HotspotMap, heartbeats::HeartbeatReward, speedtests_average::SpeedtestAverage,
subscriber_location::SubscriberValidatedLocations,
};

Expand Down Expand Up @@ -221,15 +220,15 @@ pub struct PocShares {
impl PocShares {
pub async fn aggregate(
heartbeats: impl Stream<Item = Result<HeartbeatReward, sqlx::Error>>,
speedtests: SpeedtestAverages,
speedtest_averages: &SpeedtestAverages,
) -> Result<Self, sqlx::Error> {
let mut poc_shares = Self::default();
let mut heartbeats = std::pin::pin!(heartbeats);
while let Some(heartbeat) = heartbeats.next().await.transpose()? {
let speedmultiplier = speedtests
let speedmultiplier = speedtest_averages
.get_average(&heartbeat.hotspot_key)
.as_ref()
.map_or(Decimal::ZERO, Average::reward_multiplier);
.map_or(Decimal::ZERO, SpeedtestAverage::reward_multiplier);
*poc_shares
.hotspot_shares
.entry(heartbeat.hotspot_key)
Expand Down Expand Up @@ -326,14 +325,16 @@ mod test {
data_session,
data_session::HotspotDataSession,
heartbeats::HeartbeatReward,
speedtests::{Speedtest, SpeedtestAverages},
speedtests::Speedtest,
speedtests_average::{SpeedtestAverage, SpeedtestAverages},
subscriber_location::SubscriberValidatedLocations,
};
use chrono::{Duration, Utc};
use file_store::speedtest::CellSpeedtest;
use futures::stream;
use helium_proto::services::poc_mobile::mobile_reward_share::Reward as MobileReward;
use prost::Message;
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;

fn valid_shares() -> RadioShares {
let mut radio_shares: HashMap<String, Decimal> = Default::default();
Expand Down Expand Up @@ -526,7 +527,7 @@ mod test {
assert_eq!(data_transfer_rewards.reward_scale().round_dp(1), dec!(0.5));
}

fn bytes_per_s(mbps: i64) -> i64 {
fn bytes_per_s(mbps: u64) -> u64 {
mbps * 125000
}

Expand All @@ -536,39 +537,55 @@ mod test {
.reward_weight()
}

fn acceptable_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn acceptable_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
timestamp,
upload_speed: bytes_per_s(10),
download_speed: bytes_per_s(100),
latency: 25,
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(10),
download_speed: bytes_per_s(100),
latency: 25,
serial: "".to_string(),
},
}
}

fn degraded_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn degraded_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
timestamp,
upload_speed: bytes_per_s(5),
download_speed: bytes_per_s(60),
latency: 60,
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(5),
download_speed: bytes_per_s(60),
latency: 60,
serial: "".to_string(),
},
}
}

fn failed_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn failed_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
timestamp,
upload_speed: bytes_per_s(1),
download_speed: bytes_per_s(20),
latency: 110,
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(1),
download_speed: bytes_per_s(20),
latency: 110,
serial: "".to_string(),
},
}
}

fn poor_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn poor_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
timestamp,
upload_speed: bytes_per_s(2),
download_speed: bytes_per_s(40),
latency: 90,
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(2),
download_speed: bytes_per_s(40),
latency: 90,
serial: "".to_string(),
},
}
}

Expand Down Expand Up @@ -611,19 +628,21 @@ mod test {

let last_timestamp = timestamp - Duration::hours(12);
let g1_speedtests = vec![
acceptable_speedtest(last_timestamp),
acceptable_speedtest(timestamp),
acceptable_speedtest(g1.clone(), last_timestamp),
acceptable_speedtest(g1.clone(), timestamp),
];
let g2_speedtests = vec![
acceptable_speedtest(last_timestamp),
acceptable_speedtest(timestamp),
acceptable_speedtest(g2.clone(), last_timestamp),
acceptable_speedtest(g2.clone(), timestamp),
];
let mut speedtests = HashMap::new();
speedtests.insert(g1.clone(), VecDeque::from(g1_speedtests));
speedtests.insert(g2.clone(), VecDeque::from(g2_speedtests));
let speedtest_avgs = SpeedtestAverages { speedtests };

let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs)
let g1_average = SpeedtestAverage::from(&g1_speedtests);
let g2_average = SpeedtestAverage::from(&g2_speedtests);
let mut averages = HashMap::new();
averages.insert(g1.clone(), g1_average);
averages.insert(g2.clone(), g2_average);
let speedtest_avgs = SpeedtestAverages { averages };

let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs)
.await
.unwrap();

Expand Down Expand Up @@ -779,44 +798,56 @@ mod test {
// setup speedtests
let last_speedtest = timestamp - Duration::hours(12);
let gw1_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw1.clone(), last_speedtest),
acceptable_speedtest(gw1.clone(), timestamp),
];
let gw2_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw2.clone(), last_speedtest),
acceptable_speedtest(gw2.clone(), timestamp),
];
let gw3_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw3.clone(), last_speedtest),
acceptable_speedtest(gw3.clone(), timestamp),
];
let gw4_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw4.clone(), last_speedtest),
acceptable_speedtest(gw4.clone(), timestamp),
];
let gw5_speedtests = vec![
degraded_speedtest(last_speedtest),
degraded_speedtest(timestamp),
degraded_speedtest(gw5.clone(), last_speedtest),
degraded_speedtest(gw5.clone(), timestamp),
];
let gw6_speedtests = vec![
failed_speedtest(last_speedtest),
failed_speedtest(timestamp),
failed_speedtest(gw6.clone(), last_speedtest),
failed_speedtest(gw6.clone(), timestamp),
];
let gw7_speedtests = vec![
poor_speedtest(gw7.clone(), last_speedtest),
poor_speedtest(gw7.clone(), timestamp),
];
let gw7_speedtests = vec![poor_speedtest(last_speedtest), poor_speedtest(timestamp)];
let mut speedtests = HashMap::new();
speedtests.insert(gw1, VecDeque::from(gw1_speedtests));
speedtests.insert(gw2, VecDeque::from(gw2_speedtests));
speedtests.insert(gw3, VecDeque::from(gw3_speedtests));
speedtests.insert(gw4, VecDeque::from(gw4_speedtests));
speedtests.insert(gw5, VecDeque::from(gw5_speedtests));
speedtests.insert(gw6, VecDeque::from(gw6_speedtests));
speedtests.insert(gw7, VecDeque::from(gw7_speedtests));
let speedtest_avgs = SpeedtestAverages { speedtests };

let gw1_average = SpeedtestAverage::from(&gw1_speedtests);
let gw2_average = SpeedtestAverage::from(&gw2_speedtests);
let gw3_average = SpeedtestAverage::from(&gw3_speedtests);
let gw4_average = SpeedtestAverage::from(&gw4_speedtests);
let gw5_average = SpeedtestAverage::from(&gw5_speedtests);
let gw6_average = SpeedtestAverage::from(&gw6_speedtests);
let gw7_average = SpeedtestAverage::from(&gw7_speedtests);
let mut averages = HashMap::new();
averages.insert(gw1.clone(), gw1_average);
averages.insert(gw2.clone(), gw2_average);
averages.insert(gw3.clone(), gw3_average);
averages.insert(gw4.clone(), gw4_average);
averages.insert(gw5.clone(), gw5_average);
averages.insert(gw6.clone(), gw6_average);
averages.insert(gw7.clone(), gw7_average);

let speedtest_avgs = SpeedtestAverages { averages };

// calculate the rewards for the sample group
let mut owner_rewards = HashMap::<PublicKeyBinary, u64>::new();
let epoch = (now - Duration::hours(1))..now;
for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs)
for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs)
.await
.unwrap()
.into_rewards(Decimal::ZERO, &epoch)
Expand Down
Loading

0 comments on commit 65ac933

Please sign in to comment.