Skip to content

Commit

Permalink
output share values in rewards manifest for iot and mobile
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Jun 7, 2024
1 parent 6420804 commit 3427ddf
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 62 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ helium-crypto = { version = "0.8.4", features = ["sqlx-postgres", "multisig"] }
hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [
"disktree",
] }
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
helium-proto = { git = "https://github.com/helium/proto", branch = "andymck/reward-manifest-additions", features = [
"services",
] }
solana-client = "1.16"
Expand All @@ -80,7 +80,7 @@ reqwest = { version = "0", default-features = false, features = [
"json",
"rustls-tls",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "andymck/reward-manifest-additions" }
humantime = "2"
humantime-serde = "1"
metrics = ">=0.22"
Expand Down
36 changes: 28 additions & 8 deletions iot_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ use chrono::{DateTime, TimeZone, Utc};
use db_store::meta;
use file_store::{file_sink, traits::TimestampEncode};
use futures::future::LocalBoxFuture;
use helium_proto::services::poc_lora as proto;
use helium_proto::services::poc_lora::iot_reward_share::Reward as ProtoReward;
use helium_proto::services::poc_lora::{UnallocatedReward, UnallocatedRewardType};
use helium_proto::RewardManifest;
use helium_proto::{
reward_manifest::RewardData::IotRewardData,
services::poc_lora::{
self as proto, iot_reward_share::Reward as ProtoReward, UnallocatedReward,
UnallocatedRewardType,
},
IotRewardData as ManifestIotRewardData, RewardManifest,
};
use humantime_serde::re::humantime;
use price::PriceTracker;
use reward_scheduler::Scheduler;
Expand All @@ -31,6 +35,12 @@ pub struct Rewarder {
pub price_tracker: PriceTracker,
}

pub struct RewardPocDcDataPoints {
beacon_rewards_per_share: Decimal,
witness_rewards_per_share: Decimal,
dc_transfer_rewards_per_share: Decimal,
}

impl ManagedTask for Rewarder {
fn start_task(
self: Box<Self>,
Expand Down Expand Up @@ -116,7 +126,8 @@ impl Rewarder {
let reward_period = &scheduler.reward_period;

// process rewards for poc and dc
reward_poc_and_dc(&self.pool, &self.rewards_sink, reward_period, iot_price).await?;
let poc_dc_shares =
reward_poc_and_dc(&self.pool, &self.rewards_sink, reward_period, iot_price).await?;
// process rewards for the operational fund
reward_operational(&self.rewards_sink, reward_period).await?;
// process rewards for the oracle
Expand Down Expand Up @@ -145,12 +156,18 @@ impl Rewarder {
transaction.commit().await?;

// now that the db has been purged, safe to write out the manifest
let reward_data = ManifestIotRewardData {
poc_bones_per_beacon_reward_share: poc_dc_shares.beacon_rewards_per_share.to_string(),
poc_bones_per_witness_reward_share: poc_dc_shares.witness_rewards_per_share.to_string(),
dc_bones_per_share: poc_dc_shares.dc_transfer_rewards_per_share.to_string(),
};
self.reward_manifests_sink
.write(
RewardManifest {
start_timestamp: scheduler.reward_period.start.encode_timestamp(),
end_timestamp: scheduler.reward_period.end.encode_timestamp(),
written_files,
reward_data: Some(IotRewardData(reward_data)),
},
[],
)
Expand Down Expand Up @@ -205,13 +222,12 @@ impl Rewarder {
.ok_or(db_store::Error::DecodeError)
}
}

pub async fn reward_poc_and_dc(
pool: &Pool<Postgres>,
rewards_sink: &file_sink::FileSinkClient,
reward_period: &Range<DateTime<Utc>>,
iot_price: Decimal,
) -> anyhow::Result<()> {
) -> anyhow::Result<RewardPocDcDataPoints> {
let reward_shares = reward_share::aggregate_reward_shares(pool, reward_period).await?;
let gateway_shares = GatewayShares::new(reward_shares)?;
let (beacon_rewards_per_share, witness_rewards_per_share, dc_transfer_rewards_per_share) =
Expand Down Expand Up @@ -254,7 +270,11 @@ pub async fn reward_poc_and_dc(
reward_period,
)
.await?;
Ok(())
Ok(RewardPocDcDataPoints {
beacon_rewards_per_share,
witness_rewards_per_share,
dc_transfer_rewards_per_share,
})
}

pub async fn reward_operational(
Expand Down
6 changes: 3 additions & 3 deletions mobile_verifier/src/cli/reward_from_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Cmd {
let speedtest_averages =
SpeedtestAverages::aggregate_epoch_averages(epoch.end, &pool).await?;

let reward_shares = CoveragePoints::aggregate_points(
let coverage_points = CoveragePoints::aggregate_points(
&pool,
heartbeats,
&speedtest_averages,
Expand All @@ -53,8 +53,8 @@ impl Cmd {

let mut total_rewards = 0_u64;
let mut owner_rewards = HashMap::<_, u64>::new();
let radio_rewards = reward_shares
.into_rewards(Decimal::ZERO, &epoch)
let radio_rewards = coverage_points
.into_rewards(&epoch, Decimal::ZERO)
.ok_or(anyhow::anyhow!("no rewardable events"))?;
for (_reward_amount, reward) in radio_rewards {
if let Some(proto::mobile_reward_share::Reward::RadioReward(proto::RadioReward {
Expand Down
96 changes: 67 additions & 29 deletions mobile_verifier/src/reward_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,16 +570,14 @@ impl CoveragePoints {

pub fn into_rewards(
self,
available_poc_rewards: Decimal,
epoch: &'_ Range<DateTime<Utc>>,
poc_rewards_per_share: Decimal,
) -> Option<impl Iterator<Item = (u64, proto::MobileRewardShare)> + '_> {
let total_shares = self.total_shares();
available_poc_rewards
.checked_div(total_shares)
.map(|poc_rewards_per_share| {
tracing::info!(%poc_rewards_per_share);
let start_period = epoch.start.encode_timestamp();
let end_period = epoch.end.encode_timestamp();
tracing::info!("poc_rewards_per_share {:?}", poc_rewards_per_share);
let start_period = epoch.start.encode_timestamp();
let end_period = epoch.end.encode_timestamp();
if poc_rewards_per_share > Decimal::ZERO {
Some(
self.coverage_points
.into_iter()
.flat_map(move |(hotspot_key, hotspot_points)| {
Expand All @@ -592,8 +590,11 @@ impl CoveragePoints {
hotspot_points.radio_points.into_iter(),
)
})
.filter(|(poc_reward, _mobile_reward)| *poc_reward > 0)
})
.filter(|(poc_reward, _mobile_reward)| *poc_reward > 0),
)
} else {
None
}
}
}

Expand Down Expand Up @@ -1394,7 +1395,7 @@ mod test {
let mut allocated_poc_rewards = 0_u64;

let epoch = (now - Duration::hours(1))..now;
for (reward_amount, mobile_reward) in CoveragePoints::aggregate_points(
let coverage_points = CoveragePoints::aggregate_points(
&hex_coverage,
stream::iter(heartbeat_rewards),
&speedtest_avgs,
Expand All @@ -1403,9 +1404,14 @@ mod test {
&epoch,
)
.await
.unwrap()
.into_rewards(total_poc_rewards, &epoch)
.unwrap()
.unwrap();
let total_shares = coverage_points.total_shares();
let poc_rewards_per_share = total_poc_rewards
.checked_div(total_shares)
.unwrap_or_default();
for (reward_amount, mobile_reward) in coverage_points
.into_rewards(&epoch, poc_rewards_per_share)
.unwrap()
{
let radio_reward = match mobile_reward.reward {
Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward,
Expand Down Expand Up @@ -1566,7 +1572,8 @@ mod test {
let duration = Duration::hours(1);
let epoch = (now - duration)..now;
let total_poc_rewards = get_scheduled_tokens_for_poc(epoch.end - epoch.start);
for (_reward_amount, mobile_reward) in CoveragePoints::aggregate_points(

let coverage_points = CoveragePoints::aggregate_points(
&hex_coverage,
stream::iter(heartbeat_rewards),
&speedtest_avgs,
Expand All @@ -1575,9 +1582,15 @@ mod test {
&epoch,
)
.await
.unwrap()
.into_rewards(total_poc_rewards, &epoch)
.unwrap()
.unwrap();
let total_shares = coverage_points.total_shares();
let poc_rewards_per_share = total_poc_rewards
.checked_div(total_shares)
.unwrap_or_default();

for (_reward_amount, mobile_reward) in coverage_points
.into_rewards(&epoch, poc_rewards_per_share)
.unwrap()
{
let radio_reward = match mobile_reward.reward {
Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward,
Expand Down Expand Up @@ -1696,7 +1709,8 @@ mod test {
let duration = Duration::hours(1);
let epoch = (now - duration)..now;
let total_poc_rewards = get_scheduled_tokens_for_poc(epoch.end - epoch.start);
for (_reward_amount, mobile_reward) in CoveragePoints::aggregate_points(

let coverage_points = CoveragePoints::aggregate_points(
&hex_coverage,
stream::iter(heartbeat_rewards),
&speedtest_avgs,
Expand All @@ -1705,9 +1719,15 @@ mod test {
&epoch,
)
.await
.unwrap()
.into_rewards(total_poc_rewards, &epoch)
.unwrap()
.unwrap();
let total_shares = coverage_points.total_shares();
let poc_rewards_per_share = total_poc_rewards
.checked_div(total_shares)
.unwrap_or_default();

for (_reward_amount, mobile_reward) in coverage_points
.into_rewards(&epoch, poc_rewards_per_share)
.unwrap()
{
let radio_reward = match mobile_reward.reward {
Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward,
Expand Down Expand Up @@ -1826,7 +1846,8 @@ mod test {
let duration = Duration::hours(1);
let epoch = (now - duration)..now;
let total_poc_rewards = get_scheduled_tokens_for_poc(epoch.end - epoch.start);
for (_reward_amount, mobile_reward) in CoveragePoints::aggregate_points(

let coverage_points = CoveragePoints::aggregate_points(
&hex_coverage,
stream::iter(heartbeat_rewards),
&speedtest_avgs,
Expand All @@ -1835,9 +1856,15 @@ mod test {
&epoch,
)
.await
.unwrap()
.into_rewards(total_poc_rewards, &epoch)
.unwrap()
.unwrap();
let total_shares = coverage_points.total_shares();
let poc_rewards_per_share = total_poc_rewards
.checked_div(total_shares)
.unwrap_or_default();

for (_reward_amount, mobile_reward) in coverage_points
.into_rewards(&epoch, poc_rewards_per_share)
.unwrap()
{
let radio_reward = match mobile_reward.reward {
Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward,
Expand Down Expand Up @@ -1944,12 +1971,17 @@ mod test {
let now = Utc::now();
// We should never see any radio shares from owner2, since all of them are
// less than or equal to zero.
let coverage_points = CoveragePoints { coverage_points };
let epoch = now - Duration::hours(1)..now;
let total_poc_rewards = get_scheduled_tokens_for_poc(epoch.end - epoch.start);
let coverage_points = CoveragePoints { coverage_points };
let total_shares = coverage_points.total_shares();
let poc_rewards_per_share = total_poc_rewards
.checked_div(total_shares)
.unwrap_or_default();

let expected_hotspot = gw1;
for (_reward_amount, mobile_reward) in coverage_points
.into_rewards(total_poc_rewards, &epoch)
.into_rewards(&epoch, poc_rewards_per_share)
.unwrap()
{
let radio_reward = match mobile_reward.reward {
Expand All @@ -1970,8 +2002,14 @@ mod test {
let now = Utc::now();
let epoch = now - Duration::hours(1)..now;
let total_poc_rewards = get_scheduled_tokens_for_poc(epoch.end - epoch.start);

let total_shares = coverage_points.total_shares();
let poc_rewards_per_share = total_poc_rewards
.checked_div(total_shares)
.unwrap_or_default();

assert!(coverage_points
.into_rewards(total_poc_rewards, &epoch)
.into_rewards(&epoch, poc_rewards_per_share)
.is_none());
}

Expand Down
Loading

0 comments on commit 3427ddf

Please sign in to comment.