From ac73044b481429953865d94542d9202ba5bc24aa Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Mon, 30 Sep 2024 14:50:34 -0700 Subject: [PATCH] Promotion Fund workspace (#866) * Add promotion funds workspace This workspace is all about dealing with Service Provider Promotion Fund allocation. HIP-114 https://github.com/helium/HIP/blob/main/0114-incentive-escrow-fund-for-subscriber-referrals.md Service Provider Promotions are stored in CarrierV0 on Solana. To keep the mobile-verifier from talking to a chain, this service will periodically check Solana and compare Service Providers allocations to what is stored in S3. If the values have changed, a new file will be output to a bucket for the mobile-verifier rewarder to read from. NOTE: Allocation Values are stored in Bps (Basis Points) https://www.investopedia.com/terms/b/basispoint.asp ** Commands *** ./promotion_fund write-solana Fetch Allocation values from Solana and write them to S3. This command _always_ writes an S3 file. *** ./promotion_fund print-s3 Using the lookback time in the provided settings file, show the Allocation values this service would start up with. *** ./promotion_fund server Start a server that reads from S3, then checks with Solana periodically for updated Allocatino values. Writing new files when needed. * Supporting material for promotion_fund workspace - ingest promotion rewards, nothing will be done with them until the processor is added into mobile-verifier. - dump reward files - add sp_allocations dummy field to rewarder output - reward indexer mobile promotion type added * use existing reward_types for reward_indexer Otherwise inserting a new reward would match on the address and continually change the reward_type column for no reason. * add lookback start after for s3 on start We may not always want to read from the beginning of time in s3 to get the latest values * update proto --- Cargo.lock | 160 ++++++++++++++------ Cargo.toml | 5 +- file_store/src/cli/dump_mobile_rewards.rs | 18 +++ file_store/src/file_info.rs | 15 ++ file_store/src/lib.rs | 1 + file_store/src/promotion_reward.rs | 91 ++++++++++++ file_store/src/traits/file_sink_write.rs | 15 ++ file_store/src/traits/msg_verify.rs | 1 + ingest/src/server_mobile.rs | 42 +++++- ingest/tests/common/mod.rs | 2 + mobile_verifier/src/rewarder.rs | 2 + promotion_fund/Cargo.toml | 30 ++++ promotion_fund/README.md | 18 +++ promotion_fund/pkg/settings-template.toml | 57 +++++++ promotion_fund/src/daemon.rs | 172 ++++++++++++++++++++++ promotion_fund/src/lib.rs | 140 ++++++++++++++++++ promotion_fund/src/main.rs | 119 +++++++++++++++ promotion_fund/src/settings.rs | 70 +++++++++ reward_index/src/indexer.rs | 31 +++- solana/src/carrier.rs | 52 +++++++ solana/src/lib.rs | 1 + 21 files changed, 993 insertions(+), 49 deletions(-) create mode 100644 file_store/src/promotion_reward.rs create mode 100644 promotion_fund/Cargo.toml create mode 100644 promotion_fund/README.md create mode 100644 promotion_fund/pkg/settings-template.toml create mode 100644 promotion_fund/src/daemon.rs create mode 100644 promotion_fund/src/lib.rs create mode 100644 promotion_fund/src/main.rs create mode 100644 promotion_fund/src/settings.rs create mode 100644 solana/src/carrier.rs diff --git a/Cargo.lock b/Cargo.lock index a06e8f3ae..ddbef271f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,17 +1617,17 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#376765fe006051d6dcccf709def58e7ed291b845" +source = "git+https://github.com/helium/proto?branch=map/subscriber-referral#effd56a31ab83a1079c7819cd1f783f4141c9c9b" dependencies = [ "base64 0.21.7", "byteorder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "prost", "rand 0.8.5", "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", ] @@ -1776,7 +1776,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "http-serde", "humantime-serde", @@ -2117,7 +2117,7 @@ dependencies = [ [[package]] name = "circuit-breaker" version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -2618,7 +2618,7 @@ dependencies = [ "axum 0.7.4", "bs58 0.4.0", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "notify", "serde", @@ -2758,8 +2758,8 @@ dependencies = [ [[package]] name = "data-credits" -version = "0.2.1" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.2.2" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3137,7 +3137,7 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "fanout" version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3200,7 +3200,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "hex-literal", "http 0.2.11", "lazy_static", @@ -3215,8 +3215,8 @@ dependencies = [ "serde_json", "sha2 0.10.8", "sqlx", - "strum", - "strum_macros", + "strum 0.24.1", + "strum_macros 0.24.3", "task-manager", "tempfile", "thiserror", @@ -3704,10 +3704,16 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "helium-anchor-gen" version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3754,7 +3760,7 @@ dependencies = [ [[package]] name = "helium-entity-manager" version = "0.3.1" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3763,7 +3769,7 @@ dependencies = [ [[package]] name = "helium-lib" version = "0.0.0" -source = "git+https://github.com/helium/helium-wallet-rs.git?branch=master#a4db666b45a531d690e561c225ca23c503a08bd1" +source = "git+https://github.com/helium/helium-wallet-rs.git?branch=master#b54819ac4c4bd73be37d25d7d6d48842bbc95ea9" dependencies = [ "anchor-client", "anchor-spl", @@ -3776,8 +3782,9 @@ dependencies = [ "h3o", "helium-anchor-gen", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=master)", "hex", + "hex-literal", "itertools", "jsonrpc_client", "lazy_static", @@ -3794,10 +3801,27 @@ dependencies = [ "spl-account-compression", "spl-associated-token-account 3.0.2", "thiserror", + "tonic", "tracing", "url", ] +[[package]] +name = "helium-proto" +version = "0.1.0" +source = "git+https://github.com/helium/proto?branch=map/subscriber-referral#effd56a31ab83a1079c7819cd1f783f4141c9c9b" +dependencies = [ + "bytes", + "prost", + "prost-build", + "serde", + "serde_json", + "strum 0.26.3", + "strum_macros 0.26.4", + "tonic", + "tonic-build", +] + [[package]] name = "helium-proto" version = "0.1.0" @@ -3814,8 +3838,8 @@ dependencies = [ [[package]] name = "helium-sub-daos" -version = "0.1.5" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.1.8" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -3853,7 +3877,7 @@ dependencies = [ "async-trait", "chrono", "derive_builder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "hextree", "rust_decimal", "rust_decimal_macros", @@ -3868,8 +3892,8 @@ checksum = "7ebdb29d2ea9ed0083cd8cece49bbd968021bd99b0849edb4a9a7ee0fdf6a4e0" [[package]] name = "hexboosting" -version = "0.0.5" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.1.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -4269,7 +4293,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "humantime-serde", "metrics", @@ -4338,7 +4362,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "hextree", "http 0.2.11", "http-serde", @@ -4380,7 +4404,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "http-serde", "humantime-serde", @@ -4422,7 +4446,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http-serde", "humantime-serde", "iot-config", @@ -4679,8 +4703,8 @@ dependencies = [ [[package]] name = "lazy-distributor" -version = "0.1.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.2.0" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -4689,7 +4713,7 @@ dependencies = [ [[package]] name = "lazy-transactions" version = "0.2.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -5010,7 +5034,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "hextree", "http 0.2.11", "http-serde", @@ -5050,7 +5074,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "mobile-config", "prost", "rand 0.8.5", @@ -5064,8 +5088,8 @@ dependencies = [ [[package]] name = "mobile-entity-manager" -version = "0.1.2" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.1.3" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -5086,7 +5110,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "http-serde", "humantime-serde", @@ -5130,7 +5154,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "hex-assignments", "hextree", "http-serde", @@ -5813,7 +5837,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "http 0.2.11", "hyper 0.14.28", "jsonrpsee", @@ -5896,7 +5920,7 @@ dependencies = [ "futures-util", "helium-anchor-gen", "helium-lib", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -5920,7 +5944,7 @@ dependencies = [ [[package]] name = "price-oracle" version = "0.2.1" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -6000,6 +6024,32 @@ dependencies = [ "yansi", ] +[[package]] +name = "promotion_fund" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "clap 4.4.8", + "config", + "custom-tracing", + "file-store", + "futures", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", + "humantime-serde", + "metrics", + "metrics-exporter-prometheus", + "poc-metrics", + "serde", + "solana", + "task-manager", + "tokio", + "tracing", + "tracing-subscriber", + "triggered", +] + [[package]] name = "prost" version = "0.12.4" @@ -6503,7 +6553,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=map/subscriber-referral)", "humantime-serde", "lazy_static", "metrics", @@ -6537,7 +6587,7 @@ dependencies = [ [[package]] name = "rewards-oracle" version = "0.2.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -8603,7 +8653,16 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" dependencies = [ - "strum_macros", + "strum_macros 0.24.3", +] + +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros 0.26.4", ] [[package]] @@ -8619,6 +8678,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.58", +] + [[package]] name = "subtle" version = "2.4.1" @@ -9150,7 +9222,7 @@ dependencies = [ [[package]] name = "treasury-management" version = "0.2.0" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -9391,8 +9463,8 @@ dependencies = [ [[package]] name = "voter-stake-registry" -version = "0.3.1" -source = "git+https://github.com/helium/helium-anchor-gen.git#fe60ed1d49e9255bd779a99bdd7928f278c07256" +version = "0.3.3" +source = "git+https://github.com/helium/helium-anchor-gen.git#54392f522436bc8a8c8b8c0a0b4ec407a28f66ed" dependencies = [ "anchor-gen", "anchor-lang 0.30.1", @@ -9851,7 +9923,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", "twox-hash", "xorf", diff --git a/Cargo.toml b/Cargo.toml index a45355a99..2b636c8f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "mobile_verifier", "poc_entropy", "price", + "promotion_fund", "reward_index", "reward_scheduler", "solana", @@ -70,10 +71,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ "disktree", ] } -helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ +helium-proto = { git = "https://github.com/helium/proto", branch = "map/subscriber-referral", features = [ "services", ] } -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "map/subscriber-referral" } solana-client = "1.18" solana-sdk = "1.18" solana-program = "1.18" diff --git a/file_store/src/cli/dump_mobile_rewards.rs b/file_store/src/cli/dump_mobile_rewards.rs index aadad8792..365fe12dc 100644 --- a/file_store/src/cli/dump_mobile_rewards.rs +++ b/file_store/src/cli/dump_mobile_rewards.rs @@ -3,6 +3,7 @@ use crate::{file_source, Result, Settings}; use futures::stream::StreamExt; use helium_crypto::PublicKey; use helium_proto::services::poc_mobile::mobile_reward_share::Reward::*; +use helium_proto::services::poc_mobile::promotion_reward::Entity; use helium_proto::services::poc_mobile::MobileRewardShare; use prost::Message; use serde_json::json; @@ -23,6 +24,7 @@ impl Cmd { let mut subscriber_reward = vec![]; let mut service_provider_reward = vec![]; let mut unallocated_reward = vec![]; + let mut promotion_reward = vec![]; while let Some(result) = file_stream.next().await { let msg = result?; @@ -60,6 +62,21 @@ impl Cmd { "unallocated_reward_type": reward.reward_type, "amount": reward.amount, })), + PromotionReward(reward) => { + let entity = reward.entity.unwrap(); + match entity { + Entity::SubscriberId(id) => promotion_reward.push(json!({ + "subscriber_id": uuid::Uuid::from_slice(&id).unwrap(), + "service_provider_amount": reward.service_provider_amount, + "matched_amount": reward.matched_amount, + })), + Entity::GatewayKey(key) => promotion_reward.push(json!({ + "gateway_key": PublicKey::try_from(key)?.to_string(), + "service_provider_amount": reward.service_provider_amount, + "matched_amount": reward.matched_amount, + })), + } + } }, None => todo!(), } @@ -71,6 +88,7 @@ impl Cmd { "gateway_reward": gateway_reward, "subscriber_reward": subscriber_reward, "service_provider_reward": service_provider_reward, + "promotion_reward": promotion_reward, "unallocated_reward": unallocated_reward, }))?; diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 86c2faa4a..ea330c0a3 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -164,6 +164,9 @@ pub const SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str = "subscriber_verified_mapping_ingest_report"; pub const VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str = "verified_subscriber_verified_mapping_ingest_report"; +pub const PROMOTION_REWARD_INGEST_REPORT: &str = "promotion_reward_ingest_report"; +pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward"; +pub const SERVICE_PROVIDER_PROMOTION_FUND: &str = "service_provider_promotion_fund"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -220,6 +223,9 @@ pub enum FileType { VerifiedSPBoostedRewardsBannedRadioIngestReport, SubscriberVerifiedMappingEventIngestReport, VerifiedSubscriberVerifiedMappingEventIngestReport, + PromotionRewardIngestReport, + VerifiedPromotionReward, + ServiceProviderPromotionFund, } impl fmt::Display for FileType { @@ -291,6 +297,9 @@ impl fmt::Display for FileType { Self::VerifiedSubscriberVerifiedMappingEventIngestReport => { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT } + Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT, + Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD, + Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND, }; f.write_str(s) } @@ -365,6 +374,9 @@ impl FileType { Self::VerifiedSubscriberVerifiedMappingEventIngestReport => { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT } + Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT, + Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD, + Self::ServiceProviderPromotionFund => SERVICE_PROVIDER_PROMOTION_FUND, } } } @@ -439,6 +451,9 @@ impl FromStr for FileType { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT => { Self::VerifiedSubscriberVerifiedMappingEventIngestReport } + PROMOTION_REWARD_INGEST_REPORT => Self::PromotionRewardIngestReport, + VERIFIED_PROMOTION_REWARD => Self::VerifiedPromotionReward, + SERVICE_PROVIDER_PROMOTION_FUND => Self::ServiceProviderPromotionFund, _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index 477c0dea9..9c5a132cd 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -20,6 +20,7 @@ pub mod mobile_radio_threshold; pub mod mobile_session; pub mod mobile_subscriber; pub mod mobile_transfer; +pub mod promotion_reward; pub mod reward_manifest; mod settings; pub mod speedtest; diff --git a/file_store/src/promotion_reward.rs b/file_store/src/promotion_reward.rs new file mode 100644 index 000000000..c89132355 --- /dev/null +++ b/file_store/src/promotion_reward.rs @@ -0,0 +1,91 @@ +use crate::{ + traits::{MsgDecode, TimestampDecode, TimestampEncode}, + Error, Result, +}; +use chrono::{DateTime, Utc}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::{ + self as proto, PromotionRewardIngestReportV1, PromotionRewardReqV1, +}; + +#[derive(Debug, Clone, PartialEq, Hash)] +pub enum Entity { + SubscriberId(Vec), + GatewayKey(PublicKeyBinary), +} + +impl From for Entity { + fn from(entity: proto::promotion_reward_req_v1::Entity) -> Self { + match entity { + proto::promotion_reward_req_v1::Entity::SubscriberId(v) => Entity::SubscriberId(v), + proto::promotion_reward_req_v1::Entity::GatewayKey(k) => Entity::GatewayKey(k.into()), + } + } +} + +impl From for proto::promotion_reward_req_v1::Entity { + fn from(entity: Entity) -> Self { + match entity { + Entity::SubscriberId(v) => proto::promotion_reward_req_v1::Entity::SubscriberId(v), + Entity::GatewayKey(k) => proto::promotion_reward_req_v1::Entity::GatewayKey(k.into()), + } + } +} + +impl From for proto::promotion_reward::Entity { + fn from(entity: Entity) -> Self { + match entity { + Entity::SubscriberId(v) => proto::promotion_reward::Entity::SubscriberId(v), + Entity::GatewayKey(k) => proto::promotion_reward::Entity::GatewayKey(k.into()), + } + } +} + +#[derive(Clone)] +pub struct PromotionReward { + pub entity: Entity, + pub shares: u64, + pub timestamp: DateTime, + pub received_timestamp: DateTime, + pub carrier_pub_key: PublicKeyBinary, + pub signature: Vec, +} + +impl MsgDecode for PromotionReward { + type Msg = PromotionRewardIngestReportV1; +} + +impl TryFrom for PromotionReward { + type Error = Error; + + fn try_from(v: PromotionRewardIngestReportV1) -> Result { + let received_timestamp = v.received_timestamp.to_timestamp_millis()?; + let Some(v) = v.report else { + return Err(Error::NotFound("report".to_string())); + }; + Ok(Self { + entity: if let Some(entity) = v.entity { + entity.into() + } else { + return Err(Error::NotFound("entity".to_string())); + }, + shares: v.shares, + timestamp: v.timestamp.to_timestamp()?, + received_timestamp, + carrier_pub_key: v.carrier_pub_key.into(), + signature: v.signature, + }) + } +} + +impl From for PromotionRewardReqV1 { + fn from(v: PromotionReward) -> Self { + Self { + entity: Some(v.entity.into()), + shares: v.shares, + timestamp: v.timestamp.encode_timestamp(), + carrier_pub_key: v.carrier_pub_key.into(), + signature: v.signature, + } + } +} diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index baf598fc6..651ebe080 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -268,3 +268,18 @@ impl_file_sink!( FileType::RewardManifest.to_str(), "reward_manifest" ); +impl_file_sink!( + proto::ServiceProviderPromotionFundV1, + FileType::ServiceProviderPromotionFund.to_str(), + "service_provider_promotion_fund" +); +impl_file_sink!( + poc_mobile::PromotionRewardIngestReportV1, + FileType::PromotionRewardIngestReport.to_str(), + "promotion_reward_ingest_report" +); +impl_file_sink!( + poc_mobile::VerifiedPromotionRewardV1, + FileType::VerifiedPromotionReward.to_str(), + "verified_promotion_reward" +); diff --git a/file_store/src/traits/msg_verify.rs b/file_store/src/traits/msg_verify.rs index 47bb6cb40..017a1e315 100644 --- a/file_store/src/traits/msg_verify.rs +++ b/file_store/src/traits/msg_verify.rs @@ -95,6 +95,7 @@ impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature); impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature); +impl_msg_verify!(poc_mobile::PromotionRewardReqV1, signature); #[cfg(test)] mod test { diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index c1e91ded1..71bf2bf07 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -14,7 +14,8 @@ use helium_proto::services::poc_mobile::{ CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, DataTransferSessionRespV1, InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1, - InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1, + InvalidatedRadioThresholdReportRespV1, PromotionRewardIngestReportV1, PromotionRewardReqV1, + PromotionRewardRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1, RadioThresholdReportRespV1, ServiceProviderBoostedRewardsBannedRadioIngestReportV1, ServiceProviderBoostedRewardsBannedRadioReqV1, ServiceProviderBoostedRewardsBannedRadioRespV1, SpeedtestIngestReportV1, SpeedtestReqV1, SpeedtestRespV1, SubscriberLocationIngestReportV1, @@ -46,6 +47,7 @@ pub struct GrpcServer { sp_boosted_rewards_ban_sink: FileSinkClient, subscriber_mapping_event_sink: FileSinkClient, + promotion_reward_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -85,6 +87,7 @@ impl GrpcServer { ServiceProviderBoostedRewardsBannedRadioIngestReportV1, >, subscriber_mapping_event_sink: FileSinkClient, + promotion_reward_sink: FileSinkClient, required_network: Network, address: SocketAddr, api_token: MetadataValue, @@ -100,6 +103,7 @@ impl GrpcServer { coverage_object_report_sink, sp_boosted_rewards_ban_sink, subscriber_mapping_event_sink, + promotion_reward_sink, required_network, address, api_token, @@ -437,6 +441,30 @@ impl poc_mobile::PocMobile for GrpcServer { let id = timestamp.to_string(); Ok(Response::new(SubscriberVerifiedMappingEventResV1 { id })) } + + async fn submit_promotion_reward( + &self, + request: Request, + ) -> GrpcResult { + let received_timestamp: u64 = Utc::now().timestamp_millis() as u64; + let event = request.into_inner(); + + custom_tracing::record_b58("pub_key", &event.carrier_pub_key); + + let report = self + .verify_public_key(event.carrier_pub_key.as_ref()) + .and_then(|public_key| self.verify_network(public_key)) + .and_then(|public_key| self.verify_signature(public_key, event)) + .map(|(_, event)| PromotionRewardIngestReportV1 { + received_timestamp, + report: Some(event), + })?; + + let _ = self.promotion_reward_sink.write(report, []).await; + + let id = received_timestamp.to_string(); + Ok(Response::new(PromotionRewardRespV1 { id })) + } } pub async fn grpc_server(settings: &Settings) -> Result<()> { @@ -546,6 +574,16 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ) .await?; + let (subscriber_referral_eligibility_sink, subscriber_referral_eligibility_server) = + PromotionRewardIngestReportV1::file_sink( + store_base_path, + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), + env!("CARGO_PKG_NAME"), + ) + .await?; + let Some(api_token) = settings .token .as_ref() @@ -565,6 +603,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { coverage_object_report_sink, sp_boosted_rewards_ban_sink, subscriber_mapping_event_sink, + subscriber_referral_eligibility_sink, settings.network, settings.listen_addr, api_token, @@ -588,6 +627,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .add_task(coverage_object_report_sink_server) .add_task(sp_boosted_rewards_ban_sink_server) .add_task(subscriber_mapping_event_server) + .add_task(subscriber_referral_eligibility_server) .add_task(grpc_server) .build() .start() diff --git a/ingest/tests/common/mod.rs b/ingest/tests/common/mod.rs index 52512eba1..5584416a0 100644 --- a/ingest/tests/common/mod.rs +++ b/ingest/tests/common/mod.rs @@ -44,6 +44,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { let (coverage_obj_tx, _rx) = tokio::sync::mpsc::channel(10); let (sp_boosted_tx, _rx) = tokio::sync::mpsc::channel(10); let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10); + let (promotion_rewards_tx, _rx) = tokio::sync::mpsc::channel(10); tokio::spawn(async move { let grpc_server = GrpcServer::new( @@ -57,6 +58,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> { FileSinkClient::new(coverage_obj_tx, "noop"), FileSinkClient::new(sp_boosted_tx, "noop"), FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"), + FileSinkClient::new(promotion_rewards_tx, "noop"), Network::MainNet, socket_addr, api_token, diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 76cb12313..0c7ec6cb4 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -311,6 +311,8 @@ where boosted_poc_bones_per_reward_share: Some(helium_proto::Decimal { value: poc_dc_shares.boost.to_string(), }), + // TODO: Filled in with the next PR + sp_allocations: vec![], }; self.reward_manifests .write( diff --git a/promotion_fund/Cargo.toml b/promotion_fund/Cargo.toml new file mode 100644 index 000000000..eedd3135f --- /dev/null +++ b/promotion_fund/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "promotion_fund" +version = "0.1.0" +description = "Service Provider promotion fund tracking for the Helium Network" +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +chrono = { workspace = true } +clap = { workspace = true } +config = { workspace = true } +futures = { workspace = true } +helium-proto = { workspace = true } +humantime-serde = { workspace = true } +metrics = { workspace = true } +metrics-exporter-prometheus = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +triggered = { workspace = true } + +custom-tracing = { path = "../custom_tracing" } +file-store = { path = "../file_store" } +poc-metrics = { path = "../metrics" } +solana = { path = "../solana" } +task-manager = { path = "../task_manager" } diff --git a/promotion_fund/README.md b/promotion_fund/README.md new file mode 100644 index 000000000..f024562ac --- /dev/null +++ b/promotion_fund/README.md @@ -0,0 +1,18 @@ +* Promotion Fund Server + +## S3 Inputs + +| File Type | Pattern | | +| :---- | :---- | :---- | +| ServiceProviderPromotionFundV1 | service_provider_promotion_fund.\* | [Proto](https://github.com/helium/proto/blob/map/subscriber-referral/src/service_provider.proto#L9) | + +## S3 Outpus + +| File Type | Pattern | | +| :---- | :---- | :---- | +| ServiceProviderPromotionFundV1 | service_provider_promotion_fund.\* | [Proto](https://github.com/helium/proto/blob/map/subscriber-referral/src/service_provider.proto#L9) | + + +## Server + +The server loads the latest Service Provider Promotion Funds from S3, and every `Settings.solana_check_interval` Promotion Allocation for each Service Provider in the [proto enum](https://github.com/helium/proto/blob/376765fe006051d6dcccf709def58e7ed291b845/src/service_provider.proto#L5). If the Basis Points returned are different from what is stored in S3, a new report is be report. diff --git a/promotion_fund/pkg/settings-template.toml b/promotion_fund/pkg/settings-template.toml new file mode 100644 index 000000000..1f7a7c3d8 --- /dev/null +++ b/promotion_fund/pkg/settings-template.toml @@ -0,0 +1,57 @@ +# RUST_LOG compatible settings string +# +log = "promotion_fund=info" + +# Destination of file to be watched for dynamically updating log level. +# Write a RUST_LOG compatible string to see new logs. +# +# custom_tracing = "./tracing.cfg" + +# Temporary storage for Service Provider Promotion Funds before uploading to S3 +# +file_sink_cache = "/tmp/oracles/promotion-fund" + +# How often to check with Solana for updates to Service Provider Funds +# +solana_check_interval = "6 hours" + +# On startup, how far back do we read from S3 to get the latest Service Provider +# Fund allocation values. +# +# lookback_start_after = 0 + +[solana] +# Solana RPC. This may contain a secret +# +rpc_url = "https://api.devnet.solana.com" + +# Public key for the DNT Mint (Mobile mint) +# +dnt_mint = "mb1eu7TzEc71KxDpsmsKoucSSuuoGLv1drys1oP2jh6" + +[file_store_output] +# Output bucket name for Service Provider Promotion Funds +# +bucket = "service-provider-promotions" + +# Region for bucket. Defaults to below +# +# region = "us-west-2" + +# Optional URL for AWS api endpoint. Inferred from aws config settings or aws +# IAM context by default +# +# endpoint = "https://aws-s3-bucket.aws.com" + +# Access Key when using S3 locally +# +# access_key_id = "" + +# Secret Key when using S3 locally +# +# secret_access_key = "" + +[metrics] +# Prometheus endpoint +# +# endpoint = "127.0.0.1:19001" diff --git a/promotion_fund/src/daemon.rs b/promotion_fund/src/daemon.rs new file mode 100644 index 000000000..fb49bd176 --- /dev/null +++ b/promotion_fund/src/daemon.rs @@ -0,0 +1,172 @@ +use std::{collections::HashMap, time::Duration}; + +use anyhow::{Context, Result}; +use chrono::Utc; +use file_store::{ + file_info_poller::{FileInfoPollerParser, ProstFileInfoPollerParser}, + file_sink::FileSinkClient, + FileStore, FileType, +}; +use futures::TryFutureExt; +use helium_proto::{IntoEnumIterator, ServiceProvider, ServiceProviderPromotionFundV1}; +use solana::carrier::SolanaRpc; +use task_manager::ManagedTask; +use tokio::time::{self, Interval}; + +use crate::{compare_s3_and_solana_values, settings::Settings, Action, S3Value, SolanaValue}; + +const PROMOTION_FUND_LAST_SOLANA_FETCH_TIME: &str = "promotion_fund_last_solana_fetch_time"; + +pub struct Daemon { + s3_current: S3Value, + solana_client: SolanaRpc, + file_sink: FileSinkClient, + solana_check_interval: Interval, +} + +impl ManagedTask for Daemon { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + +impl Daemon { + pub fn new( + s3_current: S3Value, + solana_client: SolanaRpc, + file_sink: FileSinkClient, + solana_check_interval: Option, + ) -> Self { + Self { + s3_current, + solana_client, + file_sink, + solana_check_interval: solana_check_interval.unwrap_or(time::interval(Duration::MAX)), + } + } + + pub async fn from_settings( + settings: &Settings, + file_sink: FileSinkClient, + ) -> anyhow::Result { + let s3_current = fetch_s3_bps(settings).await?; + let solana_client = SolanaRpc::new(&settings.solana).context("making solana client")?; + let check_timer = tokio::time::interval(settings.solana_check_interval); + + Ok(Self::new( + s3_current, + solana_client, + file_sink, + Some(check_timer), + )) + } + + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + loop { + tokio::select! { + _ = shutdown.clone() => break, + _ = self.solana_check_interval.tick() => self.handle_tick().await? + } + } + + Ok(()) + } + + pub async fn handle_tick(&mut self) -> Result<()> { + let solana_current = match fetch_solana_bps(&self.solana_client).await { + Ok(solana_current) => { + metrics::gauge!(PROMOTION_FUND_LAST_SOLANA_FETCH_TIME) + .set(Utc::now().timestamp() as f64); + solana_current + } + Err(err) => { + tracing::error!(?err, "failed to get bps from solana"); + return Ok(()); + } + }; + + let action = compare_s3_and_solana_values(&self.s3_current, &solana_current); + match action { + Action::Noop => tracing::info!("nothing to do"), + Action::Write => { + tracing::info!(items = solana_current.len(), "writing new file"); + self.store_solana_values(&solana_current); + write_protos(&self.file_sink, solana_current).await?; + } + } + + Ok(()) + } + + fn store_solana_values(&mut self, promo_funds: &[ServiceProviderPromotionFundV1]) { + self.s3_current.clear(); + + for promo_fund_v1 in promo_funds { + self.s3_current + .insert(promo_fund_v1.service_provider, promo_fund_v1.bps); + } + } +} + +pub async fn fetch_s3_bps(settings: &Settings) -> anyhow::Result { + let file_store = FileStore::from_settings(&settings.file_store_output).await?; + let mut results = HashMap::new(); + + let all = file_store + .list_all( + FileType::ServiceProviderPromotionFund.to_str(), + settings.lookback_start_after, + None, + ) + .await?; + + if let Some(last) = all.last() { + let byte_stream = file_store.get_raw(&last.key).await?; + let data: Vec = + ProstFileInfoPollerParser.parse(byte_stream).await?; + for sp_promo_fund in data { + results.insert(sp_promo_fund.service_provider, sp_promo_fund.bps); + } + } + + Ok(results) +} + +pub async fn fetch_solana_bps(client: &SolanaRpc) -> anyhow::Result { + let mut results = Vec::new(); + for service_provider in ServiceProvider::iter() { + let bps = client + .fetch_incentive_escrow_fund_bps(&service_provider.to_string()) + .await + .with_context(|| format!("fetching solana bps for {service_provider:?}"))?; + + let proto = ServiceProviderPromotionFundV1 { + timestamp: Utc::now().timestamp_millis() as u64, + service_provider: service_provider.into(), + bps: bps as u32, + }; + results.push(proto); + } + + Ok(results) +} + +pub async fn write_protos( + file_sink: &FileSinkClient, + promo_funds: Vec, +) -> anyhow::Result<()> { + for proto in promo_funds { + file_sink.write(proto, []).await?.await??; + } + file_sink.commit().await?.await??; + Ok(()) +} diff --git a/promotion_fund/src/lib.rs b/promotion_fund/src/lib.rs new file mode 100644 index 000000000..900f97fb9 --- /dev/null +++ b/promotion_fund/src/lib.rs @@ -0,0 +1,140 @@ +use std::collections::HashMap; + +use file_store::{ + file_sink::FileSinkClient, + file_upload::FileUpload, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, + FileSink, +}; +use helium_proto::ServiceProviderPromotionFundV1; +use settings::Settings; + +pub mod daemon; +pub mod settings; + +type ServiceProviderInt = i32; +type BasisPoints = u32; + +type S3Value = HashMap; +type SolanaValue = Vec; + +#[derive(Debug, PartialEq)] +pub enum Action { + Write, + Noop, +} + +fn compare_s3_and_solana_values(s3_current: &S3Value, solana_current: &SolanaValue) -> Action { + for sp_fund in solana_current { + // A Service Provider missing from the S3 file only + // matters if their Solana BPS is >0. + let s3_bps = s3_current.get(&sp_fund.service_provider).unwrap_or(&0); + if s3_bps != &sp_fund.bps { + return Action::Write; + } + } + + Action::Noop +} + +pub async fn make_promotion_fund_file_sink( + settings: &Settings, + upload: FileUpload, +) -> anyhow::Result<( + FileSinkClient, + FileSink, +)> { + let (sink, sink_server) = ServiceProviderPromotionFundV1::file_sink( + &settings.file_sink_cache, + upload, + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, + env!("CARGO_PKG_NAME"), + ) + .await?; + Ok((sink, sink_server)) +} + +#[cfg(test)] +mod tests { + use helium_proto::ServiceProvider; + + use super::*; + + #[test] + fn noop_when_nothing_in_s3_or_solana() { + let action = compare_s3_and_solana_values(&HashMap::new(), &vec![]); + assert_eq!(Action::Noop, action); + } + + #[test] + fn noop_when_new_solana_value_is_zero() { + let s3_promos = HashMap::from_iter([]); + let solana_promos = vec![ServiceProviderPromotionFundV1 { + timestamp: 0, + service_provider: ServiceProvider::HeliumMobile as i32, + bps: 0, + }]; + + let action = compare_s3_and_solana_values(&s3_promos, &solana_promos); + assert_eq!(Action::Noop, action); + } + + #[test] + fn noop_when_values_are_the_same() { + let s3_promos = HashMap::from_iter([(ServiceProvider::HeliumMobile as i32, 1)]); + let solana_promos = vec![ServiceProviderPromotionFundV1 { + timestamp: 0, + service_provider: ServiceProvider::HeliumMobile as i32, + bps: 1, + }]; + + let action = compare_s3_and_solana_values(&s3_promos, &solana_promos); + assert_eq!(Action::Noop, action); + } + + #[test] + fn write_when_new_values_from_solana() { + let sp_promos = vec![ServiceProviderPromotionFundV1 { + timestamp: 0, + service_provider: ServiceProvider::HeliumMobile as i32, + bps: 1, + }]; + + let action = compare_s3_and_solana_values(&HashMap::new(), &sp_promos); + assert_eq!(Action::Write, action); + } + + #[test] + fn write_when_solana_differs_from_s3() { + let s3_promos = HashMap::from_iter([(ServiceProvider::HeliumMobile as i32, 1)]); + let solana_promos = vec![ServiceProviderPromotionFundV1 { + timestamp: 0, + service_provider: ServiceProvider::HeliumMobile as i32, + bps: 2, + }]; + + let action = compare_s3_and_solana_values(&s3_promos, &solana_promos); + assert_eq!(Action::Write, action); + } + + #[test] + fn all_items_written_when_one_is_different() { + let s3_promos = HashMap::from_iter([(ServiceProvider::HeliumMobile as i32, 1), (1, 1)]); + let solana_promos = vec![ + ServiceProviderPromotionFundV1 { + timestamp: 0, + service_provider: ServiceProvider::HeliumMobile as i32, + bps: 2, + }, + ServiceProviderPromotionFundV1 { + timestamp: 0, + service_provider: 1, + bps: 1, + }, + ]; + + let action = compare_s3_and_solana_values(&s3_promos, &solana_promos); + assert_eq!(Action::Write, action); + } +} diff --git a/promotion_fund/src/main.rs b/promotion_fund/src/main.rs new file mode 100644 index 000000000..693d3bdcd --- /dev/null +++ b/promotion_fund/src/main.rs @@ -0,0 +1,119 @@ +use std::{path::PathBuf, time::Duration}; + +use anyhow::{Context, Result}; +use clap::{Parser, Subcommand}; +use file_store::file_upload::FileUpload; +use helium_proto::ServiceProvider; +use humantime_serde::re::humantime::format_duration; +use promotion_fund::{ + daemon::{fetch_s3_bps, fetch_solana_bps, write_protos, Daemon}, + make_promotion_fund_file_sink, + settings::Settings, +}; +use solana::carrier::SolanaRpc; +use task_manager::TaskManager; + +#[derive(Debug, Parser)] +struct Cli { + #[clap(short, long)] + config: Option, + #[clap(subcommand)] + cmd: Cmd, +} + +#[derive(Debug, Subcommand)] +enum Cmd { + /// Fetch current values from Solana and output a file to S3 + /// + /// A file will be output regardless of how recently another file was + /// written to S3. + WriteSolana, + /// Print the current values from S3 + PrintS3, + /// Check Solana for new values every `solana_check_interval` + /// + /// When the values from Solana do not match the latest values in S3, a new + /// S3 file will be output. + Server, +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + let settings = Settings::new(cli.config).context("reading settings")?; + custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?; + poc_metrics::start_metrics(&settings.metrics)?; + + match cli.cmd { + Cmd::WriteSolana => write_solana(&settings).await?, + Cmd::PrintS3 => print_s3(&settings).await?, + Cmd::Server => run_server(&settings).await?, + }; + + Ok(()) +} + +async fn run_server(settings: &Settings) -> Result<()> { + let (upload, upload_server) = FileUpload::from_settings_tm(&settings.file_store_output).await?; + let (promotion_funds_sink, promotion_fund_server) = + make_promotion_fund_file_sink(settings, upload).await?; + + let state = Daemon::from_settings(settings, promotion_funds_sink).await?; + + tracing::info!( + check_interval = %format_duration(settings.solana_check_interval), + metrics = %settings.metrics.endpoint, + "starting promotion_fund server" + ); + + TaskManager::builder() + .add_task(upload_server) + .add_task(promotion_fund_server) + .add_task(state) + .build() + .start() + .await +} + +async fn print_s3(settings: &Settings) -> Result<()> { + let s3_current = fetch_s3_bps(settings).await?; + if s3_current.is_empty() { + tracing::warn!("nothing read from s3"); + } + for (sp_int, bps) in s3_current.iter() { + let sp = ServiceProvider::try_from(*sp_int); + tracing::info!(?sp, bps); + } + + Ok(()) +} + +async fn write_solana(settings: &Settings) -> Result<()> { + let (trigger, listener) = triggered::trigger(); + let (upload, upload_server) = FileUpload::from_settings_tm(&settings.file_store_output).await?; + let (promotion_funds_sink, promotion_fund_server) = + make_promotion_fund_file_sink(settings, upload).await?; + + let handle = tokio::spawn(async move { + tokio::try_join!( + upload_server.run(listener.clone()), + promotion_fund_server.run(listener) + ) + }); + + let solana = SolanaRpc::new(&settings.solana).context("making solana client")?; + let promo_funds = fetch_solana_bps(&solana).await?; + write_protos(&promotion_funds_sink, promo_funds).await?; + tracing::info!("file written, waiting for upload..."); + + // allow time for the upload to s3 + tokio::time::sleep(Duration::from_secs(5)).await; + + trigger.trigger(); + if let Err(err) = handle.await { + tracing::warn!(?err, "something went wrong"); + return Err(anyhow::Error::from(err)); + } + + Ok(()) +} diff --git a/promotion_fund/src/settings.rs b/promotion_fund/src/settings.rs new file mode 100644 index 000000000..c62a1c19e --- /dev/null +++ b/promotion_fund/src/settings.rs @@ -0,0 +1,70 @@ +use std::{ + path::{Path, PathBuf}, + time::Duration, +}; + +use chrono::{DateTime, Utc}; +use config::{Config, Environment, File}; +use humantime_serde::re::humantime; + +#[derive(Debug, serde::Deserialize)] +pub struct Settings { + /// RUST_LOG compatible settings string. + #[serde(default = "default_log")] + pub log: String, + #[serde(default)] + pub custom_tracing: custom_tracing::Settings, + /// Temporary storage before writing to S3 + pub file_sink_cache: PathBuf, + /// How often to check for updates of service provider promotion values from + /// solana. (default: 6 hours) + #[serde(with = "humantime_serde", default = "default_solana_check_interval")] + pub solana_check_interval: Duration, + /// How far back do we go looking for the latest values in s3 on startup. If + /// there become many files, update this value to a window just past the + /// most recent. Value only used on startup. + /// (default: unix epoch) + #[serde(default = "default_lookback_start_after")] + pub lookback_start_after: DateTime, + /// Solana RPC settings + pub solana: solana::carrier::Settings, + /// File Store Bucket Settings + pub file_store_output: file_store::Settings, + /// Metrics Settings + pub metrics: poc_metrics::Settings, +} + +fn default_log() -> String { + "promotion_fund=info".to_string() +} + +fn default_solana_check_interval() -> Duration { + humantime::parse_duration("6 hours").unwrap() +} + +fn default_lookback_start_after() -> DateTime { + DateTime::UNIX_EPOCH +} + +impl Settings { + /// Load Settings from a given path. Settings are loaded from a given + /// optional path and can be overriden with environment variables. + /// + /// Environemnt overrides have the same name as the entries in the settings + /// file in uppercase and prefixed with "PROMO_". For example + /// "PROMO_LOG" will override the log setting. + pub fn new>(path: Option

) -> Result { + let mut builder = Config::builder(); + + if let Some(file) = path { + // Add optional settings file + builder = builder + .add_source(File::with_name(&file.as_ref().to_string_lossy()).required(false)); + } + + builder + .add_source(Environment::with_prefix("PROMO").separator("_")) + .build() + .and_then(|config| config.try_deserialize()) + } +} diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index f2823e778..8cd612206 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -7,8 +7,13 @@ use file_store::{ use futures::{stream, StreamExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::{ - services::poc_lora::{iot_reward_share::Reward as IotReward, IotRewardShare}, - services::poc_mobile::{mobile_reward_share::Reward as MobileReward, MobileRewardShare}, + services::{ + poc_lora::{iot_reward_share::Reward as IotReward, IotRewardShare}, + poc_mobile::{ + mobile_reward_share::Reward as MobileReward, promotion_reward::Entity, + MobileRewardShare, PromotionReward, + }, + }, Message, ServiceProvider, }; use poc_metrics::record_duration; @@ -185,6 +190,28 @@ impl Indexer { }, r.amount, ))), + Some(MobileReward::PromotionReward(PromotionReward { + entity: Some(Entity::SubscriberId(subscriber_id)), + service_provider_amount, + matched_amount, + })) => Ok(Some(( + RewardKey { + key: bs58::encode(&subscriber_id).into_string(), + reward_type: RewardType::MobileSubscriber, + }, + service_provider_amount + matched_amount, + ))), + Some(MobileReward::PromotionReward(PromotionReward { + entity: Some(Entity::GatewayKey(gateway_key)), + service_provider_amount, + matched_amount, + })) => Ok(Some(( + RewardKey { + key: PublicKeyBinary::from(gateway_key).to_string(), + reward_type: RewardType::MobileGateway, + }, + service_provider_amount + matched_amount, + ))), _ => bail!("got an invalid reward share"), } } diff --git a/solana/src/carrier.rs b/solana/src/carrier.rs new file mode 100644 index 000000000..ddff4b8f7 --- /dev/null +++ b/solana/src/carrier.rs @@ -0,0 +1,52 @@ +use crate::SolanaRpcError; +use helium_anchor_gen::{ + anchor_lang::AccountDeserialize, + helium_sub_daos, + mobile_entity_manager::{self, CarrierV0}, +}; +use serde::Deserialize; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; + +pub struct SolanaRpc { + provider: RpcClient, + sub_dao: Pubkey, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct Settings { + rpc_url: String, + dnt_mint: String, +} + +impl SolanaRpc { + pub fn new(settings: &Settings) -> Result { + let dnt_mint: Pubkey = settings.dnt_mint.parse()?; + let (sub_dao, _) = Pubkey::find_program_address( + &["sub_dao".as_bytes(), dnt_mint.as_ref()], + &helium_sub_daos::ID, + ); + let provider = + RpcClient::new_with_commitment(settings.rpc_url.clone(), CommitmentConfig::finalized()); + Ok(Self { provider, sub_dao }) + } + + pub async fn fetch_incentive_escrow_fund_bps( + &self, + network_name: &str, + ) -> Result { + let (carrier_pda, _) = Pubkey::find_program_address( + &[ + "carrier".as_bytes(), + self.sub_dao.as_ref(), + network_name.as_bytes(), + ], + &mobile_entity_manager::ID, + ); + let carrier_data = self.provider.get_account_data(&carrier_pda).await?; + let mut carrier_data = carrier_data.as_ref(); + let carrier = CarrierV0::try_deserialize(&mut carrier_data)?; + + Ok(carrier.incentive_escrow_fund_bps) + } +} diff --git a/solana/src/lib.rs b/solana/src/lib.rs index ca118930d..28de84f46 100644 --- a/solana/src/lib.rs +++ b/solana/src/lib.rs @@ -5,6 +5,7 @@ use solana_sdk::transaction::Transaction; use std::time::SystemTimeError; pub mod burn; +pub mod carrier; pub mod start_boost; macro_rules! send_with_retry {