Skip to content

Commit 7ecdbda

Browse files
lutterclaude
andcommitted
store: Add adaptive TTL cache for chain_head_ptr()
With many subgraphs, chain_head_ptr() was querying the database on every call, leading to connection pool saturation. This adds an adaptive cache that learns optimal TTL from observed block frequency. The cache uses EWMA to estimate block time and sets TTL to 1/4 of that estimate (bounded by 20ms-2000ms). During warmup (first 5 blocks), it uses the minimum TTL to avoid missing blocks on unknown chains. New metrics: - chain_head_ptr_cache_hits: cache hit counter - chain_head_ptr_cache_misses: cache miss counter (DB queries) - chain_head_ptr_cache_block_time_ms: estimated block time per chain Safety escape hatch: set GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE=true to revert to the previous uncached behavior. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent dfd627b commit 7ecdbda

File tree

2 files changed

+194
-3
lines changed

2 files changed

+194
-3
lines changed

graph/src/env/store.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ pub struct EnvVarsStore {
163163
/// Disables storing or reading `eth_call` results from the store call cache.
164164
/// Set by `GRAPH_STORE_DISABLE_CALL_CACHE`. Defaults to false.
165165
pub disable_call_cache: bool,
166+
/// Set by `GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE`. Default is false.
167+
/// Set to true to disable chain_head_ptr caching (safety escape hatch).
168+
pub disable_chain_head_ptr_cache: bool,
166169
}
167170

168171
// This does not print any values avoid accidentally leaking any sensitive env vars
@@ -224,6 +227,7 @@ impl TryFrom<InnerStore> for EnvVarsStore {
224227
account_like_min_versions_count: x.account_like_min_versions_count,
225228
account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0),
226229
disable_call_cache: x.disable_call_cache,
230+
disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache,
227231
};
228232
if let Some(timeout) = vars.batch_timeout {
229233
if timeout < 2 * vars.batch_target_duration {
@@ -331,6 +335,8 @@ pub struct InnerStore {
331335
account_like_max_unique_ratio: Option<ZeroToOneF64>,
332336
#[envconfig(from = "GRAPH_STORE_DISABLE_CALL_CACHE", default = "false")]
333337
disable_call_cache: bool,
338+
#[envconfig(from = "GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE", default = "false")]
339+
disable_chain_head_ptr_cache: bool,
334340
}
335341

336342
#[derive(Clone, Copy, Debug)]

store/postgres/src/chain_store.rs

Lines changed: 188 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use graph::util::herd_cache::HerdCache;
1717

1818
use std::collections::BTreeMap;
1919
use std::future::Future;
20+
use std::sync::atomic::{AtomicU64, Ordering};
21+
use std::time::{Duration, Instant};
2022
use std::{
2123
collections::HashMap,
2224
convert::{TryFrom, TryInto},
@@ -1873,6 +1875,10 @@ pub struct ChainStoreMetrics {
18731875
chain_head_cache_latest_block_num: Box<GaugeVec>,
18741876
chain_head_cache_hits: Box<CounterVec>,
18751877
chain_head_cache_misses: Box<CounterVec>,
1878+
// Metrics for chain_head_ptr() cache
1879+
chain_head_ptr_cache_hits: Box<CounterVec>,
1880+
chain_head_ptr_cache_misses: Box<CounterVec>,
1881+
chain_head_ptr_cache_block_time_ms: Box<GaugeVec>,
18761882
}
18771883

18781884
impl ChainStoreMetrics {
@@ -1914,12 +1920,37 @@ impl ChainStoreMetrics {
19141920
)
19151921
.expect("Can't register the counter");
19161922

1923+
let chain_head_ptr_cache_hits = registry
1924+
.new_counter_vec(
1925+
"chain_head_ptr_cache_hits",
1926+
"Number of times the chain_head_ptr cache was hit",
1927+
vec!["network".to_string()],
1928+
)
1929+
.expect("Can't register the counter");
1930+
let chain_head_ptr_cache_misses = registry
1931+
.new_counter_vec(
1932+
"chain_head_ptr_cache_misses",
1933+
"Number of times the chain_head_ptr cache was missed",
1934+
vec!["network".to_string()],
1935+
)
1936+
.expect("Can't register the counter");
1937+
let chain_head_ptr_cache_block_time_ms = registry
1938+
.new_gauge_vec(
1939+
"chain_head_ptr_cache_block_time_ms",
1940+
"Estimated block time in milliseconds used for adaptive cache TTL",
1941+
vec!["network".to_string()],
1942+
)
1943+
.expect("Can't register the gauge");
1944+
19171945
Self {
19181946
chain_head_cache_size,
19191947
chain_head_cache_oldest_block_num,
19201948
chain_head_cache_latest_block_num,
19211949
chain_head_cache_hits,
19221950
chain_head_cache_misses,
1951+
chain_head_ptr_cache_hits,
1952+
chain_head_ptr_cache_misses,
1953+
chain_head_ptr_cache_block_time_ms,
19231954
}
19241955
}
19251956

@@ -1959,6 +1990,143 @@ impl ChainStoreMetrics {
19591990
.unwrap()
19601991
.inc_by(misses as f64);
19611992
}
1993+
1994+
pub fn record_chain_head_ptr_cache_hit(&self, network: &str) {
1995+
self.chain_head_ptr_cache_hits
1996+
.with_label_values(&[network])
1997+
.inc();
1998+
}
1999+
2000+
pub fn record_chain_head_ptr_cache_miss(&self, network: &str) {
2001+
self.chain_head_ptr_cache_misses
2002+
.with_label_values(&[network])
2003+
.inc();
2004+
}
2005+
2006+
pub fn set_chain_head_ptr_block_time(&self, network: &str, block_time_ms: u64) {
2007+
self.chain_head_ptr_cache_block_time_ms
2008+
.with_label_values(&[network])
2009+
.set(block_time_ms as f64);
2010+
}
2011+
}
2012+
2013+
const MIN_TTL_MS: u64 = 20;
2014+
const MAX_TTL_MS: u64 = 2000;
2015+
const MIN_OBSERVATIONS: u64 = 5;
2016+
2017+
/// Adaptive cache for chain_head_ptr() that learns optimal TTL from block frequency.
2018+
struct ChainHeadPtrCache {
2019+
/// Cached value and when it expires
2020+
entry: RwLock<Option<(BlockPtr, Instant)>>,
2021+
/// Estimated milliseconds between blocks (EWMA)
2022+
estimated_block_time_ms: AtomicU64,
2023+
/// When we last observed the chain head change
2024+
last_change: RwLock<Instant>,
2025+
/// Number of block changes observed (for warmup)
2026+
observations: AtomicU64,
2027+
/// Metrics for recording cache hits/misses
2028+
metrics: Arc<ChainStoreMetrics>,
2029+
/// Chain name for metric labels
2030+
chain: String,
2031+
}
2032+
2033+
impl ChainHeadPtrCache {
2034+
fn new(metrics: Arc<ChainStoreMetrics>, chain: String) -> Self {
2035+
Self {
2036+
entry: RwLock::new(None),
2037+
estimated_block_time_ms: AtomicU64::new(0),
2038+
last_change: RwLock::new(Instant::now()),
2039+
observations: AtomicU64::new(0),
2040+
metrics,
2041+
chain,
2042+
}
2043+
}
2044+
2045+
/// Returns cached value if still valid, or None if cache is disabled/missed.
2046+
/// Records hit/miss metrics automatically.
2047+
fn get(&self) -> Option<BlockPtr> {
2048+
if ENV_VARS.store.disable_chain_head_ptr_cache {
2049+
return None;
2050+
}
2051+
let guard = self.entry.read();
2052+
if let Some((value, expires)) = guard.as_ref() {
2053+
if Instant::now() < *expires {
2054+
self.metrics.record_chain_head_ptr_cache_hit(&self.chain);
2055+
return Some(value.clone());
2056+
}
2057+
}
2058+
self.metrics.record_chain_head_ptr_cache_miss(&self.chain);
2059+
None
2060+
}
2061+
2062+
/// Compute current TTL - MIN_TTL during warmup, then 1/4 of estimated block time
2063+
fn current_ttl(&self) -> Duration {
2064+
let obs = AtomicU64::load(&self.observations, Ordering::Relaxed);
2065+
if obs < MIN_OBSERVATIONS {
2066+
return Duration::from_millis(MIN_TTL_MS);
2067+
}
2068+
2069+
let block_time = AtomicU64::load(&self.estimated_block_time_ms, Ordering::Relaxed);
2070+
let ttl_ms = (block_time / 4).clamp(MIN_TTL_MS, MAX_TTL_MS);
2071+
Duration::from_millis(ttl_ms)
2072+
}
2073+
2074+
/// Cache a new value, updating block time estimate if value changed.
2075+
/// Does nothing if cache is disabled.
2076+
fn set(&self, new_value: BlockPtr) {
2077+
if ENV_VARS.store.disable_chain_head_ptr_cache {
2078+
return;
2079+
}
2080+
let now = Instant::now();
2081+
2082+
// Check if block changed
2083+
let old_value = {
2084+
let guard = self.entry.read();
2085+
guard.as_ref().map(|(v, _)| v.clone())
2086+
};
2087+
2088+
// Only update estimate if we have a previous value and block number advanced
2089+
// (skip reorgs where new block number <= old)
2090+
if let Some(old_ptr) = old_value.as_ref() {
2091+
if new_value.number > old_ptr.number {
2092+
let mut last_change = self.last_change.write();
2093+
let delta_ms = now.duration_since(*last_change).as_millis() as u64;
2094+
*last_change = now;
2095+
2096+
let blocks_advanced = (new_value.number - old_ptr.number) as u64;
2097+
2098+
// Increment observation count
2099+
let obs = AtomicU64::fetch_add(&self.observations, 1, Ordering::Relaxed);
2100+
2101+
// Ignore unreasonable deltas (> 60s)
2102+
if delta_ms > 0 && delta_ms < 60_000 {
2103+
let per_block_ms = delta_ms / blocks_advanced;
2104+
let new_estimate = if obs == 0 {
2105+
// First observation - use as initial estimate
2106+
per_block_ms
2107+
} else {
2108+
// EWMA: new = 0.8 * old + 0.2 * observed
2109+
let old_estimate =
2110+
AtomicU64::load(&self.estimated_block_time_ms, Ordering::Relaxed);
2111+
(old_estimate * 4 + per_block_ms) / 5
2112+
};
2113+
AtomicU64::store(
2114+
&self.estimated_block_time_ms,
2115+
new_estimate,
2116+
Ordering::Relaxed,
2117+
);
2118+
2119+
// Update metric gauge
2120+
self.metrics
2121+
.set_chain_head_ptr_block_time(&self.chain, new_estimate);
2122+
}
2123+
}
2124+
}
2125+
2126+
// Compute TTL and store with expiry
2127+
let ttl = self.current_ttl();
2128+
*self.entry.write() = Some((new_value, now + ttl));
2129+
}
19622130
}
19632131

19642132
pub struct ChainStore {
@@ -1980,6 +2148,8 @@ pub struct ChainStore {
19802148
blocks_by_number_cache:
19812149
HerdCache<Arc<Result<BTreeMap<BlockNumber, Vec<JsonBlock>>, StoreError>>>,
19822150
ancestor_cache: HerdCache<Arc<Result<Option<(json::Value, BlockPtr)>, StoreError>>>,
2151+
/// Adaptive cache for chain_head_ptr()
2152+
chain_head_ptr_cache: ChainHeadPtrCache,
19832153
}
19842154

19852155
impl ChainStore {
@@ -1994,10 +2164,11 @@ impl ChainStore {
19942164
metrics: Arc<ChainStoreMetrics>,
19952165
) -> Self {
19962166
let recent_blocks_cache =
1997-
RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics);
2167+
RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics.clone());
19982168
let blocks_by_hash_cache = HerdCache::new(format!("chain_{}_blocks_by_hash", chain));
19992169
let blocks_by_number_cache = HerdCache::new(format!("chain_{}_blocks_by_number", chain));
20002170
let ancestor_cache = HerdCache::new(format!("chain_{}_ancestor", chain));
2171+
let chain_head_ptr_cache = ChainHeadPtrCache::new(metrics, chain.clone());
20012172
ChainStore {
20022173
logger,
20032174
pool,
@@ -2009,6 +2180,7 @@ impl ChainStore {
20092180
blocks_by_hash_cache,
20102181
blocks_by_number_cache,
20112182
ancestor_cache,
2183+
chain_head_ptr_cache,
20122184
}
20132185
}
20142186

@@ -2351,8 +2523,14 @@ impl ChainHeadStore for ChainStore {
23512523
async fn chain_head_ptr(self: Arc<Self>) -> Result<Option<BlockPtr>, Error> {
23522524
use public::ethereum_networks::dsl::*;
23532525

2526+
// Check cache first (handles disabled check and metrics internally)
2527+
if let Some(cached) = self.chain_head_ptr_cache.get() {
2528+
return Ok(Some(cached));
2529+
}
2530+
2531+
// Query database
23542532
let mut conn = self.pool.get_permitted().await?;
2355-
Ok(ethereum_networks
2533+
let result = ethereum_networks
23562534
.select((head_block_hash, head_block_number))
23572535
.filter(name.eq(&self.chain))
23582536
.load::<(Option<String>, Option<i64>)>(&mut conn)
@@ -2375,7 +2553,14 @@ impl ChainHeadStore for ChainStore {
23752553
_ => unreachable!(),
23762554
})
23772555
.and_then(|opt: Option<BlockPtr>| opt)
2378-
})?)
2556+
})?;
2557+
2558+
// Cache the result (set() handles disabled check internally)
2559+
if let Some(ref ptr) = result {
2560+
self.chain_head_ptr_cache.set(ptr.clone());
2561+
}
2562+
2563+
Ok(result)
23792564
}
23802565

23812566
async fn chain_head_cursor(&self) -> Result<Option<String>, Error> {

0 commit comments

Comments
 (0)