Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into map/download-data-sets
Browse files Browse the repository at this point in the history
  • Loading branch information
maplant committed May 16, 2024
2 parents f46c818 + b040ebc commit 3a7a966
Show file tree
Hide file tree
Showing 97 changed files with 1,082 additions and 940 deletions.
188 changes: 147 additions & 41 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ reqwest = { version = "0", default-features = false, features = [
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
humantime = "2"
metrics = "0.21"
humantime-serde = "1"
metrics = ">=0.22"
metrics-exporter-prometheus = "0"
tracing = "0"
tracing-subscriber = { version = "0", default-features = false, features = [
Expand Down
1 change: 1 addition & 0 deletions boost_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ http = {workspace = true}
http-serde = {workspace = true}
solana = {path = "../solana"}
solana-sdk = {workspace = true}
humantime-serde = { workspace = true }
2 changes: 1 addition & 1 deletion boost_manager/pkg/settings-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ log = "boost_manager=info,solana=debug"
# Cache location for generated boost manager outputs; Required
cache = "/tmp/oracles/boost-manager"

start_after = 1702602001
start_after = "2024-12-15 01:00:00Z"

enable_solana_integration = true

Expand Down
3 changes: 2 additions & 1 deletion boost_manager/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::OnChainStatus;
use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Utc};
use file_store::hex_boost::BoostedHexActivation;
use sqlx::{postgres::PgRow, FromRow, Pool, Postgres, Row, Transaction};
use std::time::Duration;

const MAX_RETRIES: i32 = 10;
const MAX_BATCH_COUNT: i32 = 200;
Expand Down
12 changes: 7 additions & 5 deletions boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,18 @@ impl Server {
file_upload::FileUpload::from_settings_tm(&settings.output).await?;
let store_base_path = path::Path::new(&settings.cache);

let reward_check_interval = settings.reward_check_interval;

// setup the received for the rewards manifest files
let file_store = FileStore::from_settings(&settings.verifier).await?;
let (manifest_receiver, manifest_server) =
file_source::continuous_source::<RewardManifest, _>()
.state(pool.clone())
.store(file_store)
.prefix(FileType::RewardManifest.to_string())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.poll_duration(settings.reward_check_interval())
.offset(settings.reward_check_interval() * 2)
.lookback(LookbackBehavior::StartAfter(settings.start_after))
.poll_duration(reward_check_interval)
.offset(reward_check_interval * 2)
.create()
.await?;

Expand Down Expand Up @@ -124,12 +126,12 @@ impl Server {
let updater = Updater::new(
pool.clone(),
settings.enable_solana_integration,
settings.activation_check_interval(),
settings.activation_check_interval,
settings.txn_batch_size(),
solana,
)?;

let purger = Purger::new(pool.clone(), settings.retention_period());
let purger = Purger::new(pool.clone(), settings.retention_period);

TaskManager::builder()
.add_task(file_upload_server)
Expand Down
7 changes: 3 additions & 4 deletions boost_manager/src/purger.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::db;
use chrono::Duration as ChronoDuration;
use futures::{future::LocalBoxFuture, TryFutureExt};
use sqlx::{Pool, Postgres};
use std::time::Duration;
Expand All @@ -9,7 +8,7 @@ const PURGE_INTERVAL: Duration = Duration::from_secs(30);

pub struct Purger {
pool: Pool<Postgres>,
retention_period: ChronoDuration,
retention_period: Duration,
}

impl ManagedTask for Purger {
Expand All @@ -27,7 +26,7 @@ impl ManagedTask for Purger {
}

impl Purger {
pub fn new(pool: Pool<Postgres>, retention_period: ChronoDuration) -> Self {
pub fn new(pool: Pool<Postgres>, retention_period: Duration) -> Self {
Self {
pool,
retention_period,
Expand All @@ -50,7 +49,7 @@ impl Purger {
}
}

pub async fn purge(pool: &Pool<Postgres>, retention_period: ChronoDuration) -> anyhow::Result<()> {
pub async fn purge(pool: &Pool<Postgres>, retention_period: Duration) -> anyhow::Result<()> {
let num_records_purged = db::purge_stale_records(pool, retention_period).await?;
tracing::info!("purged {} stale records", num_records_purged);
Ok(())
Expand Down
58 changes: 22 additions & 36 deletions boost_manager/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc};
use chrono::{DateTime, Utc};
use config::{Config, Environment, File};
use humantime_serde::re::humantime;
use serde::Deserialize;
use std::{path::Path, time::Duration};

Expand All @@ -11,13 +12,16 @@ pub struct Settings {
pub log: String,
/// Cache location for generated verified reports
pub cache: String,
/// Reward files check interval in seconds. (Default is 900; 15 minutes)
#[serde(default = "default_reward_check_interval")]
pub reward_check_interval: i64,
/// Hex Activation check interval in seconds. (Default is 900; 15 minutes)
/// Reward files check interval in seconds. (Default is 15 minutes)
#[serde(with = "humantime_serde", default = "default_reward_check_interval")]
pub reward_check_interval: Duration,
/// Hex Activation check interval in seconds. (Default is 15 minutes)
/// determines how often we will check the DB for queued txns to solana
#[serde(default = "default_activation_check_interval")]
pub activation_check_interval: i64,
#[serde(
with = "humantime_serde",
default = "default_activation_check_interval"
)]
pub activation_check_interval: Duration,
pub database: db_store::Settings,
pub verifier: file_store::Settings,
pub mobile_config_client: mobile_config::ClientSettings,
Expand All @@ -27,33 +31,33 @@ pub struct Settings {
pub enable_solana_integration: bool,
pub solana: Option<solana::start_boost::Settings>,
#[serde(default = "default_start_after")]
pub start_after: u64,
pub start_after: DateTime<Utc>,
// the number of records to fit per solana txn
#[serde(default = "default_txn_batch_size")]
pub txn_batch_size: u32,
// default retention period in seconds
#[serde(default = "default_retention_period")]
pub retention_period: i64,
#[serde(with = "humantime_serde", default = "default_retention_period")]
pub retention_period: Duration,
}

fn default_retention_period() -> i64 {
86400 * 7 // 7 days
fn default_retention_period() -> Duration {
humantime::parse_duration("7 days").unwrap()
}

fn default_txn_batch_size() -> u32 {
18
}

fn default_reward_check_interval() -> i64 {
900
fn default_reward_check_interval() -> Duration {
humantime::parse_duration("15 minutes").unwrap()
}

fn default_activation_check_interval() -> i64 {
900
fn default_activation_check_interval() -> Duration {
humantime::parse_duration("15 minutes").unwrap()
}

pub fn default_start_after() -> u64 {
0
pub fn default_start_after() -> DateTime<Utc> {
DateTime::UNIX_EPOCH
}

pub fn default_log() -> String {
Expand Down Expand Up @@ -83,25 +87,7 @@ impl Settings {
.and_then(|config| config.try_deserialize())
}

pub fn reward_check_interval(&self) -> ChronoDuration {
ChronoDuration::seconds(self.reward_check_interval)
}

pub fn activation_check_interval(&self) -> Duration {
Duration::from_secs(self.activation_check_interval as u64)
}

pub fn retention_period(&self) -> ChronoDuration {
ChronoDuration::seconds(self.retention_period)
}

pub fn txn_batch_size(&self) -> usize {
self.txn_batch_size as usize
}

pub fn start_after(&self) -> DateTime<Utc> {
Utc.timestamp_opt(self.start_after as i64, 0)
.single()
.unwrap()
}
}
2 changes: 1 addition & 1 deletion boost_manager/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub async fn last_reward_processed_time(
db: &Pool<Postgres>,
datetime: DateTime<Utc>,
) -> anyhow::Result<()> {
metrics::gauge!(LAST_REWARD_PROCESSED_TIME, datetime.timestamp() as f64);
metrics::gauge!(LAST_REWARD_PROCESSED_TIME).set(datetime.timestamp() as f64);
meta::store(db, LAST_REWARD_PROCESSED_TIME, datetime.timestamp()).await?;

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions boost_manager/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ where

async fn check_failed_activations(&self) -> Result<()> {
let num_marked_failed = db::update_failed_activations(&self.pool).await?;
metrics::counter!("failed_activations", num_marked_failed);
metrics::counter!("failed_activations").increment(num_marked_failed);
let total_failed_count = db::get_failed_activations_count(&self.pool).await?;
metrics::gauge!("db_failed_row_count", total_failed_count as f64);
metrics::gauge!("db_failed_row_count").set(total_failed_count as f64);
if total_failed_count > 0 {
tracing::warn!("{} failed status activations ", total_failed_count);
};
Expand All @@ -159,7 +159,7 @@ where
summed_activations_count: u64,
) -> Result<()> {
tracing::info!("processed batch of {} activations successfully", batch_size);
metrics::counter!("success_activations", summed_activations_count);
metrics::counter!("success_activations").increment(summed_activations_count);
db::update_success_batch(&self.pool, ids).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion boost_manager/tests/integrations/purger_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn test_purge(pool: PgPool) -> anyhow::Result<()> {
assert_eq!(7, count);

// do da purge
purger::purge(&pool, Duration::days(7)).await?;
purger::purge(&pool, Duration::days(7).to_std()?).await?;

// assert the db contains the expected number of records post purge
let count: i64 = sqlx::query_scalar("select count(*) from activated_hexes")
Expand Down
4 changes: 2 additions & 2 deletions db_store/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ macro_rules! query_exec_timed {
( $name:literal, $query:expr, $meth:ident, $exec:expr ) => {{
match poc_metrics::record_duration!(concat!($name, "_duration"), $query.$meth($exec).await) {
Ok(x) => {
metrics::increment_counter!(concat!($name, "_count"), "status" => "ok");
metrics::counter!(concat!($name, "_count"), "status" => "ok").increment(1);
Ok(x)
}
Err(e) => {
metrics::increment_counter!(concat!($name, "_count"), "status" => "error");
metrics::counter!(concat!($name, "_count"), "status" => "error").increment(1);
Err(Error::SqlError(e))
}
}
Expand Down
4 changes: 2 additions & 2 deletions db_store/src/metric_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn run(size_name: String, idle_name: String, pool: sqlx::Pool<sqlx::Postgr
loop {
trigger.tick().await;

metrics::gauge!(size_name.clone(), pool.size() as f64);
metrics::gauge!(idle_name.clone(), pool.num_idle() as f64);
metrics::gauge!(size_name.clone()).set(pool.size() as f64);
metrics::gauge!(idle_name.clone()).set(pool.num_idle() as f64);
}
}
1 change: 1 addition & 0 deletions denylist/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ serde_json = { workspace = true }
config = { workspace = true }
chrono = { workspace = true }
xorf-generator = { git = "https://github.com/helium/xorf-generator", branch = "main" }
humantime-serde = { workspace = true }
15 changes: 6 additions & 9 deletions denylist/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{Error, Result};
use config::{Config, Environment, File};
use helium_crypto::PublicKey;
use humantime_serde::re::humantime;
use serde::Deserialize;
use std::{path::Path, str::FromStr, time::Duration};

Expand All @@ -13,9 +14,9 @@ pub struct Settings {
/// Listen address for http requests for entropy. Default "0.0.0.0:8080"
#[serde(default = "default_denylist_url")]
pub denylist_url: String,
/// Cadence at which we poll for an updated denylist (secs)
#[serde(default = "default_trigger_interval")]
pub trigger: u64,
/// Cadence at which we poll for an updated denylist (Default: 6hours)
#[serde(with = "humantime_serde", default = "default_trigger_interval")]
pub trigger_interval: Duration,
// vec of b58 helium encoded pubkeys
// used to verify signature of denylist filters
#[serde(default)]
Expand All @@ -30,8 +31,8 @@ pub fn default_denylist_url() -> String {
"https://api.github.com/repos/helium/denylist/releases/latest".to_string()
}

fn default_trigger_interval() -> u64 {
21600
fn default_trigger_interval() -> Duration {
humantime::parse_duration("6 hours").unwrap()
}

impl Settings {
Expand All @@ -58,10 +59,6 @@ impl Settings {
.map_err(Error::from)
}

pub fn trigger_interval(&self) -> Duration {
Duration::from_secs(self.trigger)
}

pub fn sign_keys(&self) -> std::result::Result<Vec<PublicKey>, helium_crypto::Error> {
self.sign_keys
.iter()
Expand Down
16 changes: 6 additions & 10 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{file_store, traits::MsgDecode, Error, FileInfo, FileStore, Result};
use aws_sdk_s3::types::ByteStream;
use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Utc};
use derive_builder::Builder;
use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt};
use futures_util::TryFutureExt;
use retainer::Cache;
use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
use std::{collections::VecDeque, marker::PhantomData, sync::Arc, time::Duration};
use task_manager::ManagedTask;
use tokio::sync::mpsc::{Receiver, Sender};

Expand Down Expand Up @@ -68,9 +68,8 @@ where
let latency = Utc::now() - self.file_info.timestamp;
metrics::gauge!(
"file-processing-latency",
latency.num_seconds() as f64,
"file-type" => self.file_info.prefix.clone(), "process-name" => self.process_name.clone(),
);
).set(latency.num_seconds() as f64);

recorder.record(&self.process_name, &self.file_info).await?;
Ok(futures::stream::iter(self.data.into_iter()).boxed())
Expand All @@ -86,14 +85,14 @@ pub enum LookbackBehavior {
#[derive(Debug, Clone, Builder)]
#[builder(pattern = "owned")]
pub struct FileInfoPollerConfig<T, S, P> {
#[builder(default = "Duration::seconds(DEFAULT_POLL_DURATION_SECS)")]
#[builder(default = "DEFAULT_POLL_DURATION")]
poll_duration: Duration,
state: S,
store: FileStore,
prefix: String,
parser: P,
lookback: LookbackBehavior,
#[builder(default = "Duration::minutes(10)")]
#[builder(default = "Duration::from_secs(10 * 60)")]
offset: Duration,
#[builder(default = "5")]
queue_size: usize,
Expand Down Expand Up @@ -263,10 +262,7 @@ where
}

fn poll_duration(&self) -> std::time::Duration {
self.config
.poll_duration
.to_std()
.unwrap_or(DEFAULT_POLL_DURATION)
self.config.poll_duration
}

async fn is_already_processed(&self, file_info: &FileInfo) -> Result<bool> {
Expand Down
Loading

0 comments on commit 3a7a966

Please sign in to comment.