Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor speedtests handling, ensure rewards only include tests up until epoch end #581

Merged
merged 4 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions mobile_verifier/migrations/15_speedtests_one_to_one.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

jeffgrunewald marked this conversation as resolved.
Show resolved Hide resolved
CREATE TABLE speedtests_migration (
pubkey text NOT NULL,
upload_speed bigint,
download_speed bigint,
latency integer,
"serial" text,
timestamp timestamptz NOT NULL,
inserted_at timestamptz default now(),
PRIMARY KEY(pubkey, timestamp)
);
CREATE INDEX idx_speedtests2_pubkey on speedtests_migration (pubkey);

INSERT INTO speedtests_migration (pubkey, upload_speed, download_speed, latency, serial, timestamp)
SELECT id, (st).upload_speed, (st).download_speed, (st).latency, '', (st).timestamp
FROM (select id, unnest(speedtests) as st from speedtests_old) as tmp
ON CONFLICT DO NOTHING;

ALTER TABLE speedtests RENAME TO speedtests_old;
andymck marked this conversation as resolved.
Show resolved Hide resolved
ALTER TABLE speedtests_migration RENAME TO speedtests;


15 changes: 8 additions & 7 deletions mobile_verifier/src/cli/reward_from_db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
heartbeats::HeartbeatReward,
reward_shares::{get_scheduled_tokens_for_poc_and_dc, PocShares},
speedtests::{Average, SpeedtestAverages},
speedtests_average::SpeedtestAverages,
Settings,
};
use anyhow::Result;
Expand Down Expand Up @@ -39,8 +39,9 @@ impl Cmd {
.await?;

let heartbeats = HeartbeatReward::validated(&pool, &epoch);
let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?;
let reward_shares = PocShares::aggregate(heartbeats, speedtests.clone()).await?;
let speedtest_averages =
SpeedtestAverages::aggregate_epoch_averages(epoch.end, &pool).await?;
let reward_shares = PocShares::aggregate(heartbeats, &speedtest_averages).await?;

let mut total_rewards = 0_u64;
let mut owner_rewards = HashMap::<_, u64>::new();
Expand All @@ -62,11 +63,11 @@ impl Cmd {
}
let rewards: Vec<_> = owner_rewards.into_iter().collect();
let mut multiplier_count = HashMap::<_, usize>::new();
let speedtest_multipliers: Vec<_> = speedtests
.speedtests
let speedtest_multipliers: Vec<_> = speedtest_averages
.averages
.into_iter()
.map(|(pub_key, avg)| {
let reward_multiplier = Average::from(&avg).reward_multiplier();
.map(|(pub_key, average)| {
let reward_multiplier = average.reward_multiplier;
*multiplier_count.entry(reward_multiplier).or_default() += 1;
(pub_key, reward_multiplier)
})
Expand Down
4 changes: 2 additions & 2 deletions mobile_verifier/src/data_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ pub async fn data_sessions_to_dc<'a>(

pub async fn clear_hotspot_data_sessions(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
reward_period: &Range<DateTime<Utc>>,
timestamp: &DateTime<Utc>,
) -> Result<(), sqlx::Error> {
sqlx::query("delete from hotspot_data_transfer_sessions where received_timestamp < $1")
.bind(reward_period.end)
.bind(timestamp)
.execute(&mut *tx)
.await?;
Ok(())
Expand Down
11 changes: 11 additions & 0 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,14 @@ async fn validate_heartbeat(

Ok((cell_type, proto::HeartbeatValidity::Valid))
}

pub async fn clear_heartbeats(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
timestamp: &DateTime<Utc>,
) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM heartbeats WHERE truncated_timestamp < $1")
.bind(timestamp)
.execute(&mut *tx)
.await?;
Ok(())
}
1 change: 1 addition & 0 deletions mobile_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod heartbeats;
mod reward_shares;
mod settings;
mod speedtests;
mod speedtests_average;
mod subscriber_location;
mod telemetry;

Expand Down
157 changes: 92 additions & 65 deletions mobile_verifier/src/reward_shares.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::{
data_session::HotspotMap,
heartbeats::HeartbeatReward,
speedtests::{Average, SpeedtestAverages},
speedtests_average::{SpeedtestAverage, SpeedtestAverages},
subscriber_location::SubscriberValidatedLocations,
};

use chrono::{DateTime, Duration, Utc};
use file_store::traits::TimestampEncode;
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -221,15 +220,15 @@ pub struct PocShares {
impl PocShares {
pub async fn aggregate(
heartbeats: impl Stream<Item = Result<HeartbeatReward, sqlx::Error>>,
speedtests: SpeedtestAverages,
speedtest_averages: &SpeedtestAverages,
) -> Result<Self, sqlx::Error> {
let mut poc_shares = Self::default();
let mut heartbeats = std::pin::pin!(heartbeats);
while let Some(heartbeat) = heartbeats.next().await.transpose()? {
let speedmultiplier = speedtests
let speedmultiplier = speedtest_averages
.get_average(&heartbeat.hotspot_key)
.as_ref()
.map_or(Decimal::ZERO, Average::reward_multiplier);
.map_or(Decimal::ZERO, SpeedtestAverage::reward_multiplier);
*poc_shares
.hotspot_shares
.entry(heartbeat.hotspot_key)
Expand Down Expand Up @@ -322,18 +321,16 @@ pub fn get_scheduled_tokens_for_mappers(duration: Duration) -> Decimal {
mod test {
use super::*;
use crate::{
cell_type::CellType,
data_session,
data_session::HotspotDataSession,
heartbeats::HeartbeatReward,
speedtests::{Speedtest, SpeedtestAverages},
cell_type::CellType, data_session, data_session::HotspotDataSession,
heartbeats::HeartbeatReward, speedtests::Speedtest,
subscriber_location::SubscriberValidatedLocations,
};
use chrono::{Duration, Utc};
use file_store::speedtest::CellSpeedtest;
use futures::stream;
use helium_proto::services::poc_mobile::mobile_reward_share::Reward as MobileReward;
use prost::Message;
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;

fn valid_shares() -> RadioShares {
let mut radio_shares: HashMap<String, Decimal> = Default::default();
Expand Down Expand Up @@ -526,7 +523,7 @@ mod test {
assert_eq!(data_transfer_rewards.reward_scale().round_dp(1), dec!(0.5));
}

fn bytes_per_s(mbps: i64) -> i64 {
fn bytes_per_s(mbps: u64) -> u64 {
mbps * 125000
}

Expand All @@ -536,39 +533,55 @@ mod test {
.reward_weight()
}

fn acceptable_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn acceptable_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
timestamp,
upload_speed: bytes_per_s(10),
download_speed: bytes_per_s(100),
latency: 25,
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(10),
download_speed: bytes_per_s(100),
latency: 25,
serial: "".to_string(),
},
}
}

fn degraded_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn degraded_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
timestamp,
upload_speed: bytes_per_s(5),
download_speed: bytes_per_s(60),
latency: 60,
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(5),
download_speed: bytes_per_s(60),
latency: 60,
serial: "".to_string(),
},
}
}

fn failed_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn failed_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
timestamp,
upload_speed: bytes_per_s(1),
download_speed: bytes_per_s(20),
latency: 110,
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(1),
download_speed: bytes_per_s(20),
latency: 110,
serial: "".to_string(),
},
}
}

fn poor_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn poor_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
timestamp,
upload_speed: bytes_per_s(2),
download_speed: bytes_per_s(40),
latency: 90,
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(2),
download_speed: bytes_per_s(40),
latency: 90,
serial: "".to_string(),
},
}
}

Expand Down Expand Up @@ -611,19 +624,21 @@ mod test {

let last_timestamp = timestamp - Duration::hours(12);
let g1_speedtests = vec![
acceptable_speedtest(last_timestamp),
acceptable_speedtest(timestamp),
acceptable_speedtest(g1.clone(), last_timestamp),
acceptable_speedtest(g1.clone(), timestamp),
];
let g2_speedtests = vec![
acceptable_speedtest(last_timestamp),
acceptable_speedtest(timestamp),
acceptable_speedtest(g2.clone(), last_timestamp),
acceptable_speedtest(g2.clone(), timestamp),
];
let mut speedtests = HashMap::new();
speedtests.insert(g1.clone(), VecDeque::from(g1_speedtests));
speedtests.insert(g2.clone(), VecDeque::from(g2_speedtests));
let speedtest_avgs = SpeedtestAverages { speedtests };

let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs)
let g1_average = SpeedtestAverage::from(&g1_speedtests);
let g2_average = SpeedtestAverage::from(&g2_speedtests);
let mut averages = HashMap::new();
averages.insert(g1.clone(), g1_average);
averages.insert(g2.clone(), g2_average);
let speedtest_avgs = SpeedtestAverages { averages };

let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs)
.await
.unwrap();

Expand Down Expand Up @@ -779,44 +794,56 @@ mod test {
// setup speedtests
let last_speedtest = timestamp - Duration::hours(12);
let gw1_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw1.clone(), last_speedtest),
acceptable_speedtest(gw1.clone(), timestamp),
];
let gw2_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw2.clone(), last_speedtest),
acceptable_speedtest(gw2.clone(), timestamp),
];
let gw3_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw3.clone(), last_speedtest),
acceptable_speedtest(gw3.clone(), timestamp),
];
let gw4_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw4.clone(), last_speedtest),
acceptable_speedtest(gw4.clone(), timestamp),
];
let gw5_speedtests = vec![
degraded_speedtest(last_speedtest),
degraded_speedtest(timestamp),
degraded_speedtest(gw5.clone(), last_speedtest),
degraded_speedtest(gw5.clone(), timestamp),
];
let gw6_speedtests = vec![
failed_speedtest(last_speedtest),
failed_speedtest(timestamp),
failed_speedtest(gw6.clone(), last_speedtest),
failed_speedtest(gw6.clone(), timestamp),
];
let gw7_speedtests = vec![poor_speedtest(last_speedtest), poor_speedtest(timestamp)];
let mut speedtests = HashMap::new();
speedtests.insert(gw1, VecDeque::from(gw1_speedtests));
speedtests.insert(gw2, VecDeque::from(gw2_speedtests));
speedtests.insert(gw3, VecDeque::from(gw3_speedtests));
speedtests.insert(gw4, VecDeque::from(gw4_speedtests));
speedtests.insert(gw5, VecDeque::from(gw5_speedtests));
speedtests.insert(gw6, VecDeque::from(gw6_speedtests));
speedtests.insert(gw7, VecDeque::from(gw7_speedtests));
let speedtest_avgs = SpeedtestAverages { speedtests };
let gw7_speedtests = vec![
poor_speedtest(gw7.clone(), last_speedtest),
poor_speedtest(gw7.clone(), timestamp),
];

let gw1_average = SpeedtestAverage::from(&gw1_speedtests);
let gw2_average = SpeedtestAverage::from(&gw2_speedtests);
let gw3_average = SpeedtestAverage::from(&gw3_speedtests);
let gw4_average = SpeedtestAverage::from(&gw4_speedtests);
let gw5_average = SpeedtestAverage::from(&gw5_speedtests);
let gw6_average = SpeedtestAverage::from(&gw6_speedtests);
let gw7_average = SpeedtestAverage::from(&gw7_speedtests);
let mut averages = HashMap::new();
averages.insert(gw1.clone(), gw1_average);
averages.insert(gw2.clone(), gw2_average);
averages.insert(gw3.clone(), gw3_average);
averages.insert(gw4.clone(), gw4_average);
averages.insert(gw5.clone(), gw5_average);
averages.insert(gw6.clone(), gw6_average);
averages.insert(gw7.clone(), gw7_average);

let speedtest_avgs = SpeedtestAverages { averages };

// calculate the rewards for the sample group
let mut owner_rewards = HashMap::<PublicKeyBinary, u64>::new();
let epoch = (now - Duration::hours(1))..now;
for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs)
for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs)
.await
.unwrap()
.into_rewards(Decimal::ZERO, &epoch)
Expand Down
Loading