@@ -17,6 +17,8 @@ use graph::util::herd_cache::HerdCache;
1717
1818use std:: collections:: BTreeMap ;
1919use std:: future:: Future ;
20+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
21+ use std:: time:: { Duration , Instant } ;
2022use std:: {
2123 collections:: HashMap ,
2224 convert:: { TryFrom , TryInto } ,
@@ -1873,6 +1875,10 @@ pub struct ChainStoreMetrics {
18731875 chain_head_cache_latest_block_num : Box < GaugeVec > ,
18741876 chain_head_cache_hits : Box < CounterVec > ,
18751877 chain_head_cache_misses : Box < CounterVec > ,
1878+ // Metrics for chain_head_ptr() cache
1879+ chain_head_ptr_cache_hits : Box < CounterVec > ,
1880+ chain_head_ptr_cache_misses : Box < CounterVec > ,
1881+ chain_head_ptr_cache_block_time_ms : Box < GaugeVec > ,
18761882}
18771883
18781884impl ChainStoreMetrics {
@@ -1914,12 +1920,37 @@ impl ChainStoreMetrics {
19141920 )
19151921 . expect ( "Can't register the counter" ) ;
19161922
1923+ let chain_head_ptr_cache_hits = registry
1924+ . new_counter_vec (
1925+ "chain_head_ptr_cache_hits" ,
1926+ "Number of times the chain_head_ptr cache was hit" ,
1927+ vec ! [ "network" . to_string( ) ] ,
1928+ )
1929+ . expect ( "Can't register the counter" ) ;
1930+ let chain_head_ptr_cache_misses = registry
1931+ . new_counter_vec (
1932+ "chain_head_ptr_cache_misses" ,
1933+ "Number of times the chain_head_ptr cache was missed" ,
1934+ vec ! [ "network" . to_string( ) ] ,
1935+ )
1936+ . expect ( "Can't register the counter" ) ;
1937+ let chain_head_ptr_cache_block_time_ms = registry
1938+ . new_gauge_vec (
1939+ "chain_head_ptr_cache_block_time_ms" ,
1940+ "Estimated block time in milliseconds used for adaptive cache TTL" ,
1941+ vec ! [ "network" . to_string( ) ] ,
1942+ )
1943+ . expect ( "Can't register the gauge" ) ;
1944+
19171945 Self {
19181946 chain_head_cache_size,
19191947 chain_head_cache_oldest_block_num,
19201948 chain_head_cache_latest_block_num,
19211949 chain_head_cache_hits,
19221950 chain_head_cache_misses,
1951+ chain_head_ptr_cache_hits,
1952+ chain_head_ptr_cache_misses,
1953+ chain_head_ptr_cache_block_time_ms,
19231954 }
19241955 }
19251956
@@ -1959,6 +1990,137 @@ impl ChainStoreMetrics {
19591990 . unwrap ( )
19601991 . inc_by ( misses as f64 ) ;
19611992 }
1993+
1994+ pub fn record_chain_head_ptr_cache_hit ( & self , network : & str ) {
1995+ self . chain_head_ptr_cache_hits
1996+ . with_label_values ( & [ network] )
1997+ . inc ( ) ;
1998+ }
1999+
2000+ pub fn record_chain_head_ptr_cache_miss ( & self , network : & str ) {
2001+ self . chain_head_ptr_cache_misses
2002+ . with_label_values ( & [ network] )
2003+ . inc ( ) ;
2004+ }
2005+
2006+ pub fn set_chain_head_ptr_block_time ( & self , network : & str , block_time_ms : u64 ) {
2007+ self . chain_head_ptr_cache_block_time_ms
2008+ . with_label_values ( & [ network] )
2009+ . set ( block_time_ms as f64 ) ;
2010+ }
2011+ }
2012+
2013+ const MIN_TTL_MS : u64 = 20 ;
2014+ const MAX_TTL_MS : u64 = 2000 ;
2015+ const MIN_OBSERVATIONS : u64 = 5 ;
2016+
2017+ /// Adaptive cache for chain_head_ptr() that learns optimal TTL from block frequency.
2018+ struct ChainHeadPtrCache {
2019+ /// Cached value and when it expires
2020+ entry : RwLock < Option < ( BlockPtr , Instant ) > > ,
2021+ /// Estimated milliseconds between blocks (EWMA)
2022+ estimated_block_time_ms : AtomicU64 ,
2023+ /// When we last observed the chain head change
2024+ last_change : RwLock < Instant > ,
2025+ /// Number of block changes observed (for warmup)
2026+ observations : AtomicU64 ,
2027+ /// Metrics for recording cache hits/misses
2028+ metrics : Arc < ChainStoreMetrics > ,
2029+ /// Chain name for metric labels
2030+ chain : String ,
2031+ }
2032+
2033+ impl ChainHeadPtrCache {
2034+ fn new ( metrics : Arc < ChainStoreMetrics > , chain : String ) -> Self {
2035+ Self {
2036+ entry : RwLock :: new ( None ) ,
2037+ estimated_block_time_ms : AtomicU64 :: new ( 0 ) ,
2038+ last_change : RwLock :: new ( Instant :: now ( ) ) ,
2039+ observations : AtomicU64 :: new ( 0 ) ,
2040+ metrics,
2041+ chain,
2042+ }
2043+ }
2044+
2045+ /// Returns cached value if still valid, or None if cache is disabled/missed.
2046+ /// Records hit/miss metrics automatically.
2047+ fn get ( & self ) -> Option < BlockPtr > {
2048+ if ENV_VARS . store . disable_chain_head_ptr_cache {
2049+ return None ;
2050+ }
2051+ let guard = self . entry . read ( ) ;
2052+ if let Some ( ( value, expires) ) = guard. as_ref ( ) {
2053+ if Instant :: now ( ) < * expires {
2054+ self . metrics . record_chain_head_ptr_cache_hit ( & self . chain ) ;
2055+ return Some ( value. clone ( ) ) ;
2056+ }
2057+ }
2058+ self . metrics . record_chain_head_ptr_cache_miss ( & self . chain ) ;
2059+ None
2060+ }
2061+
2062+ /// Compute current TTL - MIN_TTL during warmup, then 1/4 of estimated block time
2063+ fn current_ttl ( & self ) -> Duration {
2064+ let obs = AtomicU64 :: load ( & self . observations , Ordering :: Relaxed ) ;
2065+ if obs < MIN_OBSERVATIONS {
2066+ return Duration :: from_millis ( MIN_TTL_MS ) ;
2067+ }
2068+
2069+ let block_time = AtomicU64 :: load ( & self . estimated_block_time_ms , Ordering :: Relaxed ) ;
2070+ let ttl_ms = ( block_time / 4 ) . clamp ( MIN_TTL_MS , MAX_TTL_MS ) ;
2071+ Duration :: from_millis ( ttl_ms)
2072+ }
2073+
2074+ /// Cache a new value, updating block time estimate if value changed.
2075+ /// Does nothing if cache is disabled.
2076+ fn set ( & self , new_value : BlockPtr ) {
2077+ if ENV_VARS . store . disable_chain_head_ptr_cache {
2078+ return ;
2079+ }
2080+ let now = Instant :: now ( ) ;
2081+
2082+ // Check if block changed
2083+ let old_value = {
2084+ let guard = self . entry . read ( ) ;
2085+ guard. as_ref ( ) . map ( |( v, _) | v. clone ( ) )
2086+ } ;
2087+
2088+ if old_value. as_ref ( ) != Some ( & new_value) {
2089+ // Block changed - update estimate
2090+ let mut last_change = self . last_change . write ( ) ;
2091+ let delta_ms = now. duration_since ( * last_change) . as_millis ( ) as u64 ;
2092+ * last_change = now;
2093+
2094+ // Increment observation count
2095+ let obs = AtomicU64 :: fetch_add ( & self . observations , 1 , Ordering :: Relaxed ) ;
2096+
2097+ // Ignore unreasonable deltas (negative or > 60s)
2098+ if delta_ms > 0 && delta_ms < 60_000 {
2099+ let new_estimate = if obs == 0 {
2100+ // First observation - use as initial estimate
2101+ delta_ms
2102+ } else {
2103+ // EWMA: new = 0.8 * old + 0.2 * observed
2104+ let old_estimate =
2105+ AtomicU64 :: load ( & self . estimated_block_time_ms , Ordering :: Relaxed ) ;
2106+ ( old_estimate * 4 + delta_ms) / 5
2107+ } ;
2108+ AtomicU64 :: store (
2109+ & self . estimated_block_time_ms ,
2110+ new_estimate,
2111+ Ordering :: Relaxed ,
2112+ ) ;
2113+
2114+ // Update metric gauge
2115+ self . metrics
2116+ . set_chain_head_ptr_block_time ( & self . chain , new_estimate) ;
2117+ }
2118+ }
2119+
2120+ // Compute TTL and store with expiry
2121+ let ttl = self . current_ttl ( ) ;
2122+ * self . entry . write ( ) = Some ( ( new_value, now + ttl) ) ;
2123+ }
19622124}
19632125
19642126pub struct ChainStore {
@@ -1980,6 +2142,8 @@ pub struct ChainStore {
19802142 blocks_by_number_cache :
19812143 HerdCache < Arc < Result < BTreeMap < BlockNumber , Vec < JsonBlock > > , StoreError > > > ,
19822144 ancestor_cache : HerdCache < Arc < Result < Option < ( json:: Value , BlockPtr ) > , StoreError > > > ,
2145+ /// Adaptive cache for chain_head_ptr()
2146+ chain_head_ptr_cache : ChainHeadPtrCache ,
19832147}
19842148
19852149impl ChainStore {
@@ -1994,10 +2158,11 @@ impl ChainStore {
19942158 metrics : Arc < ChainStoreMetrics > ,
19952159 ) -> Self {
19962160 let recent_blocks_cache =
1997- RecentBlocksCache :: new ( recent_blocks_cache_capacity, chain. clone ( ) , metrics) ;
2161+ RecentBlocksCache :: new ( recent_blocks_cache_capacity, chain. clone ( ) , metrics. clone ( ) ) ;
19982162 let blocks_by_hash_cache = HerdCache :: new ( format ! ( "chain_{}_blocks_by_hash" , chain) ) ;
19992163 let blocks_by_number_cache = HerdCache :: new ( format ! ( "chain_{}_blocks_by_number" , chain) ) ;
20002164 let ancestor_cache = HerdCache :: new ( format ! ( "chain_{}_ancestor" , chain) ) ;
2165+ let chain_head_ptr_cache = ChainHeadPtrCache :: new ( metrics, chain. clone ( ) ) ;
20012166 ChainStore {
20022167 logger,
20032168 pool,
@@ -2009,6 +2174,7 @@ impl ChainStore {
20092174 blocks_by_hash_cache,
20102175 blocks_by_number_cache,
20112176 ancestor_cache,
2177+ chain_head_ptr_cache,
20122178 }
20132179 }
20142180
@@ -2351,8 +2517,14 @@ impl ChainHeadStore for ChainStore {
23512517 async fn chain_head_ptr ( self : Arc < Self > ) -> Result < Option < BlockPtr > , Error > {
23522518 use public:: ethereum_networks:: dsl:: * ;
23532519
2520+ // Check cache first (handles disabled check and metrics internally)
2521+ if let Some ( cached) = self . chain_head_ptr_cache . get ( ) {
2522+ return Ok ( Some ( cached) ) ;
2523+ }
2524+
2525+ // Query database
23542526 let mut conn = self . pool . get_permitted ( ) . await ?;
2355- Ok ( ethereum_networks
2527+ let result = ethereum_networks
23562528 . select ( ( head_block_hash, head_block_number) )
23572529 . filter ( name. eq ( & self . chain ) )
23582530 . load :: < ( Option < String > , Option < i64 > ) > ( & mut conn)
@@ -2375,7 +2547,14 @@ impl ChainHeadStore for ChainStore {
23752547 _ => unreachable ! ( ) ,
23762548 } )
23772549 . and_then ( |opt : Option < BlockPtr > | opt)
2378- } ) ?)
2550+ } ) ?;
2551+
2552+ // Cache the result (set() handles disabled check internally)
2553+ if let Some ( ref ptr) = result {
2554+ self . chain_head_ptr_cache . set ( ptr. clone ( ) ) ;
2555+ }
2556+
2557+ Ok ( result)
23792558 }
23802559
23812560 async fn chain_head_cursor ( & self ) -> Result < Option < String > , Error > {
0 commit comments