Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added caching to rich list and token distribution endpo… #1287

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ "
uint = { version = "0.9", default-features = false }
url = { version = "2.3", default-features = false }
uuid = { version = "1.3", default-features = false, features = [ "v4" ] }
once_cell = "1.17.1"

# Optional
chrono = { version = "0.4", default-features = false, features = [ "std" ], optional = true }
Expand Down Expand Up @@ -112,7 +113,7 @@ api = [
influx = [
"dep:influxdb",
]
inx = [
inx = [
"dep:inx",
"dep:tonic",
]
Expand Down
10 changes: 4 additions & 6 deletions src/analytics/influx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,10 @@ where
M: Measurement,
{
fn prepare_query(&self) -> Vec<WriteQuery> {
vec![
influxdb::Timestamp::from(self.at.milestone_timestamp)
.into_query(M::NAME)
.add_field("milestone_index", self.at.milestone_index)
.add_fields(&self.inner),
]
vec![influxdb::Timestamp::from(self.at.milestone_timestamp)
.into_query(M::NAME)
.add_field("milestone_index", self.at.milestone_index)
.add_fields(&self.inner)]
}
}

Expand Down
9 changes: 6 additions & 3 deletions src/bin/inx-chronicle/api/explorer/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,15 @@ const DEFAULT_TOP_RICHLIST: usize = 100;
pub struct RichestAddressesQuery {
pub top: usize,
pub ledger_index: Option<MilestoneIndex>,
pub cached: Option<bool>,
}

impl Default for RichestAddressesQuery {
fn default() -> Self {
Self {
top: DEFAULT_TOP_RICHLIST,
ledger_index: None,
cached: None,
}
}
}
Expand All @@ -292,16 +294,17 @@ impl<B: Send> FromRequest<B> for RichestAddressesQuery {

#[derive(Copy, Clone, Deserialize, Default)]
#[serde(default, deny_unknown_fields, rename_all = "camelCase")]
pub struct LedgerIndex {
pub struct TokenDistributionQuery {
pub ledger_index: Option<MilestoneIndex>,
pub cached: Option<bool>,
}

#[async_trait]
impl<B: Send> FromRequest<B> for LedgerIndex {
impl<B: Send> FromRequest<B> for TokenDistributionQuery {
type Rejection = ApiError;

async fn from_request(req: &mut axum::extract::RequestParts<B>) -> Result<Self, Self::Rejection> {
let Query(query) = Query::<LedgerIndex>::from_request(req)
let Query(query) = Query::<TokenDistributionQuery>::from_request(req)
.await
.map_err(RequestError::from)?;
Ok(query)
Expand Down
122 changes: 113 additions & 9 deletions src/bin/inx-chronicle/api/explorer/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use std::str::FromStr;
use std::time::SystemTime;

use tracing::info;

use axum::{extract::Path, routing::get, Extension};
use chronicle::{
Expand All @@ -19,13 +22,14 @@ use chronicle::{
},
};
use futures::{StreamExt, TryStreamExt};
use iota_sdk::types::block::address::ToBech32Ext;
use iota_sdk::types::block::address::{Hrp, ToBech32Ext};

use super::{
extractors::{
BlocksByMilestoneCursor, BlocksByMilestoneIdPagination, BlocksByMilestoneIndexPagination, LedgerIndex,
BlocksByMilestoneCursor, BlocksByMilestoneIdPagination, BlocksByMilestoneIndexPagination,
LedgerUpdatesByAddressCursor, LedgerUpdatesByAddressPagination, LedgerUpdatesByMilestoneCursor,
LedgerUpdatesByMilestonePagination, MilestonesCursor, MilestonesPagination, RichestAddressesQuery,
TokenDistributionQuery,
},
responses::{
AddressStatDto, BalanceResponse, BlockChildrenResponse, BlockPayloadTypeDto, BlocksByMilestoneResponse,
Expand All @@ -40,6 +44,9 @@ use crate::api::{
ApiResult,
};

use once_cell::sync::Lazy;
use tokio::sync::RwLock;

pub fn routes() -> Router {
Router::new()
.route("/balance/:address", get(balance))
Expand Down Expand Up @@ -319,17 +326,71 @@ async fn blocks_by_milestone_id(
.await
}

struct RichestCacheData {
last_updated: u64,
data: RichestAddressesResponse,
}

struct TokenCacheData {
last_updated: u64,
data: TokenDistributionResponse,
}

fn get_seconds_until_midnight() -> u64 {
let now = SystemTime::now();
let since_epoch = now.duration_since(SystemTime::UNIX_EPOCH).expect("Time went backwards");
86400 - (since_epoch.as_secs() % 86400)
}

fn get_days_since_epoch() -> u64 {
let now = SystemTime::now();
let secs_since_epoch = now
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
secs_since_epoch / 86400
}

static RICHEST_ADDRESSES_CACHE: Lazy<RwLock<Option<RichestCacheData>>> = Lazy::new(|| RwLock::new(None));
static TOKEN_DISTRIBUTION_CACHE: Lazy<RwLock<Option<TokenCacheData>>> = Lazy::new(|| RwLock::new(None));

fn get_cache_bool(cache: Option<bool>) -> bool {
// default case is use the cache
match cache {
Some(b) => b,
None => true,
}
}

async fn richest_addresses_ledger_analytics(
database: Extension<MongoDb>,
RichestAddressesQuery { top, ledger_index }: RichestAddressesQuery,
RichestAddressesQuery {
top,
ledger_index,
cached,
}: RichestAddressesQuery,
) -> ApiResult<RichestAddressesResponse> {
let ledger_index = resolve_ledger_index(&database, ledger_index).await?;
let mut cache = RICHEST_ADDRESSES_CACHE.write().await;
let cached = get_cache_bool(cached);
let days_since_epoch = get_days_since_epoch();

if cached {
if let Some(cached_data) = &*cache {
if cached_data.last_updated == days_since_epoch {
return Ok(cached_data.data.clone());
}
}
info!("refreshing richest-addresses cache ...");
}

let refresh_start = SystemTime::now();
let res = database
.collection::<OutputCollection>()
.get_richest_addresses(ledger_index, top)
.await?;

let hrp = database
let hrp: Hrp = database
.collection::<ProtocolUpdateCollection>()
.get_protocol_parameters_for_ledger_index(ledger_index)
.await?
Expand All @@ -338,7 +399,7 @@ async fn richest_addresses_ledger_analytics(
.bech32_hrp
.parse()?;

Ok(RichestAddressesResponse {
let response = RichestAddressesResponse {
top: res
.top
.into_iter()
Expand All @@ -350,23 +411,66 @@ async fn richest_addresses_ledger_analytics(
})
.collect(),
ledger_index,
})
};

if cached {
// Store the response in the cache
*cache = Some(RichestCacheData {
last_updated: days_since_epoch,
data: response.clone(),
});

let refresh_elapsed = refresh_start.elapsed().unwrap();
info!("refreshing richest-addresses cache done. Took {:?}", refresh_elapsed);
info!("next refresh in {} seconds", get_seconds_until_midnight());
}

Ok(response)
}

async fn token_distribution_ledger_analytics(
database: Extension<MongoDb>,
LedgerIndex { ledger_index }: LedgerIndex,
TokenDistributionQuery { ledger_index, cached }: TokenDistributionQuery,
) -> ApiResult<TokenDistributionResponse> {
let ledger_index = resolve_ledger_index(&database, ledger_index).await?;
let mut cache = TOKEN_DISTRIBUTION_CACHE.write().await;
let cached = get_cache_bool(cached);
let days_since_epoch = get_days_since_epoch();

if cached {
if let Some(cached_data) = &*cache {
if cached_data.last_updated == days_since_epoch {
return Ok(cached_data.data.clone());
}
}

info!("refreshing token-distribution cache ...");
}

let refresh_start = SystemTime::now();
let res = database
.collection::<OutputCollection>()
.get_token_distribution(ledger_index)
.await?;

Ok(TokenDistributionResponse {
let response = TokenDistributionResponse {
distribution: res.distribution.into_iter().map(Into::into).collect(),
ledger_index,
})
};

if cached {
// Store the response in the cache
*cache = Some(TokenCacheData {
last_updated: days_since_epoch,
data: response.clone(),
});

let refresh_elapsed = refresh_start.elapsed().unwrap();
info!("refreshing token-distribution cache done. Took {:?}", refresh_elapsed);
info!("next refresh in {} seconds", get_seconds_until_midnight());
}

Ok(response)
}

/// This is just a helper fn to either unwrap an optional ledger index param or fetch the latest
Expand Down
12 changes: 5 additions & 7 deletions tests/node_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ mod test_rand {
let node_configuration = setup_collection::<ConfigurationUpdateCollection>(&db).await.unwrap();

// empty collection
assert!(
node_configuration
.get_latest_node_configuration()
.await
.unwrap()
.is_none()
);
assert!(node_configuration
.get_latest_node_configuration()
.await
.unwrap()
.is_none());

let mut config = NodeConfiguration {
milestone_public_key_count: 3,
Expand Down
Loading