Skip to content

Commit a4cb593

Browse files
lutterclaude
andcommitted
store: Add HerdCache to chain_head_ptr() to prevent thundering herd
The ChainHeadPtrCache introduced in 7ecdbda can cause connection pool exhaustion when the cache expires: multiple concurrent callers each acquire a database connection, then block waiting for a write lock to update the cache - while still holding their connections. This adds a HerdCache layer that ensures only one caller queries the database when the TTL cache expires. Other concurrent callers await the in-flight query result instead of each acquiring their own connection. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent fb4b492 commit a4cb593

File tree

1 file changed

+45
-30
lines changed

1 file changed

+45
-30
lines changed

store/postgres/src/chain_store.rs

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2150,6 +2150,8 @@ pub struct ChainStore {
21502150
ancestor_cache: HerdCache<Arc<Result<Option<(json::Value, BlockPtr)>, StoreError>>>,
21512151
/// Adaptive cache for chain_head_ptr()
21522152
chain_head_ptr_cache: ChainHeadPtrCache,
2153+
/// Herd cache to prevent thundering herd on chain_head_ptr() lookups
2154+
chain_head_ptr_herd: HerdCache<Arc<Result<Option<BlockPtr>, StoreError>>>,
21532155
}
21542156

21552157
impl ChainStore {
@@ -2169,6 +2171,7 @@ impl ChainStore {
21692171
let blocks_by_number_cache = HerdCache::new(format!("chain_{}_blocks_by_number", chain));
21702172
let ancestor_cache = HerdCache::new(format!("chain_{}_ancestor", chain));
21712173
let chain_head_ptr_cache = ChainHeadPtrCache::new(metrics, chain.clone());
2174+
let chain_head_ptr_herd = HerdCache::new(format!("chain_{}_head_ptr", chain));
21722175
ChainStore {
21732176
logger,
21742177
pool,
@@ -2181,6 +2184,7 @@ impl ChainStore {
21812184
blocks_by_number_cache,
21822185
ancestor_cache,
21832186
chain_head_ptr_cache,
2187+
chain_head_ptr_herd,
21842188
}
21852189
}
21862190

@@ -2523,44 +2527,55 @@ impl ChainHeadStore for ChainStore {
25232527
async fn chain_head_ptr(self: Arc<Self>) -> Result<Option<BlockPtr>, Error> {
25242528
use public::ethereum_networks::dsl::*;
25252529

2526-
// Check cache first (handles disabled check and metrics internally)
2530+
// Check TTL cache first (handles disabled check and metrics internally)
25272531
if let Some(cached) = self.chain_head_ptr_cache.get() {
25282532
return Ok(Some(cached));
25292533
}
25302534

2531-
// Query database
2532-
let mut conn = self.pool.get_permitted().await?;
2533-
let result = ethereum_networks
2534-
.select((head_block_hash, head_block_number))
2535-
.filter(name.eq(&self.chain))
2536-
.load::<(Option<String>, Option<i64>)>(&mut conn)
2537-
.await
2538-
.map(|rows| {
2539-
rows.as_slice()
2540-
.first()
2541-
.map(|(hash_opt, number_opt)| match (hash_opt, number_opt) {
2542-
(Some(hash), Some(number)) => Some(
2543-
(
2544-
// FIXME:
2545-
//
2546-
// workaround for arweave
2547-
H256::from_slice(&hex::decode(hash).unwrap()[..32]),
2548-
*number,
2549-
)
2550-
.into(),
2551-
),
2552-
(None, None) => None,
2553-
_ => unreachable!(),
2554-
})
2555-
.and_then(|opt: Option<BlockPtr>| opt)
2556-
})?;
2535+
// Use HerdCache to ensure only one caller does the DB lookup
2536+
// when cache is expired. Other callers await the in-flight query.
2537+
let pool = self.pool.clone();
2538+
let chain = self.chain.clone();
2539+
let lookup = async move {
2540+
let mut conn = pool.get_permitted().await?;
2541+
ethereum_networks
2542+
.select((head_block_hash, head_block_number))
2543+
.filter(name.eq(&chain))
2544+
.load::<(Option<String>, Option<i64>)>(&mut conn)
2545+
.await
2546+
.map(|rows| {
2547+
rows.as_slice()
2548+
.first()
2549+
.map(|(hash_opt, number_opt)| match (hash_opt, number_opt) {
2550+
(Some(hash), Some(number)) => Some(
2551+
(
2552+
// FIXME:
2553+
//
2554+
// workaround for arweave
2555+
H256::from_slice(&hex::decode(hash).unwrap()[..32]),
2556+
*number,
2557+
)
2558+
.into(),
2559+
),
2560+
(None, None) => None,
2561+
_ => unreachable!(),
2562+
})
2563+
.and_then(|opt: Option<BlockPtr>| opt)
2564+
})
2565+
.map_err(StoreError::from)
2566+
};
2567+
2568+
let (result, _cached) = self
2569+
.cached_lookup(&self.chain_head_ptr_herd, &self.chain, lookup)
2570+
.await;
25572571

2558-
// Cache the result (set() handles disabled check internally)
2559-
if let Some(ref ptr) = result {
2572+
// Update TTL cache with the result
2573+
// (set() handles disabled check internally)
2574+
if let Ok(Some(ref ptr)) = result {
25602575
self.chain_head_ptr_cache.set(ptr.clone());
25612576
}
25622577

2563-
Ok(result)
2578+
result.map_err(Error::from)
25642579
}
25652580

25662581
async fn chain_head_cursor(&self) -> Result<Option<String>, Error> {

0 commit comments

Comments
 (0)