Skip to content

Commit fd65e1a

Browse files
committed
graph: Add CachedBlock enum and typed block cache
1 parent 140cc9f commit fd65e1a

File tree

11 files changed

+310
-236
lines changed

11 files changed

+310
-236
lines changed

chain/ethereum/src/chain.rs

Lines changed: 21 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ use graph::blockchain::block_stream::{
6565
BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor,
6666
TriggersAdapterWrapper,
6767
};
68-
use graph::components::ethereum::EthereumJsonBlock;
69-
7068
/// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320
7169
const CELO_CHAIN_IDS: [u64; 3] = [42220, 44787, 62320];
7270

@@ -1076,52 +1074,25 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
10761074
.ancestor_block(ptr.clone(), offset, root.clone())
10771075
.await?;
10781076

1079-
// First check if we have the ancestor in cache and can deserialize it.
1080-
// The cached JSON can be in one of three formats:
1081-
// 1. Full RPC format: {"block": {...}, "transaction_receipts": [...]}
1082-
// 2. Shallow/header-only: {"timestamp": "...", "data": null} - only timestamp, no block data
1083-
// 3. Legacy direct: block fields at root level {hash, number, transactions, ...}
1084-
// We need full format with receipts for ancestor_block (used for trigger processing).
1077+
// Use full blocks (with receipts) directly from cache.
1078+
// Light blocks (no receipts) need to be fetched from Firehose/RPC.
10851079
let block_ptr = match cached {
1086-
Some((json, ptr)) => {
1087-
let json_block = EthereumJsonBlock::new(json);
1088-
if json_block.is_shallow() {
1089-
trace!(
1090-
self.logger,
1091-
"Cached block #{} {} is shallow (header-only). Falling back to Firehose/RPC.",
1092-
ptr.number,
1093-
ptr.hash_hex(),
1094-
);
1095-
ptr
1096-
} else if json_block.is_legacy_format() {
1097-
trace!(
1098-
self.logger,
1099-
"Cached block #{} {} is legacy light format. Falling back to Firehose/RPC.",
1100-
ptr.number,
1101-
ptr.hash_hex(),
1102-
);
1103-
ptr
1104-
} else {
1105-
match json_block.into_full_block() {
1106-
Ok(block) => {
1107-
return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
1108-
ethereum_block: block,
1109-
calls: None,
1110-
})));
1111-
}
1112-
Err(e) => {
1113-
warn!(
1114-
self.logger,
1115-
"Failed to deserialize cached ancestor block #{} {} (offset {} from #{}): {}. \
1116-
Falling back to Firehose/RPC.",
1117-
ptr.number,
1118-
ptr.hash_hex(),
1119-
offset,
1120-
ptr_for_log.number,
1121-
e
1122-
);
1123-
ptr
1124-
}
1080+
Some((cached_block, ptr)) => {
1081+
match cached_block.into_full_block() {
1082+
Some(block) => {
1083+
return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
1084+
ethereum_block: block,
1085+
calls: None,
1086+
})));
1087+
}
1088+
None => {
1089+
trace!(
1090+
self.logger,
1091+
"Cached block #{} {} is light (no receipts). Falling back to Firehose/RPC.",
1092+
ptr.number,
1093+
ptr.hash_hex(),
1094+
);
1095+
ptr
11251096
}
11261097
}
11271098
}
@@ -1179,35 +1150,10 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
11791150
let block = match self.chain_client.as_ref() {
11801151
ChainClient::Firehose(endpoints) => {
11811152
let chain_store = self.chain_store.cheap_clone();
1182-
// First try to get the block from the store
1183-
// See ancestor_block() for documentation of the 3 cached JSON formats.
1153+
// First try to get the block from the store (typed cache)
11841154
if let Ok(blocks) = chain_store.blocks(vec![block.hash.clone()]).await {
1185-
if let Some(cached_json) = blocks.into_iter().next() {
1186-
let json_block = EthereumJsonBlock::new(cached_json);
1187-
if json_block.is_shallow() {
1188-
trace!(
1189-
self.logger,
1190-
"Cached block #{} {} is shallow. Falling back to Firehose.",
1191-
block.number,
1192-
block.hash_hex(),
1193-
);
1194-
} else {
1195-
match json_block.into_light_block() {
1196-
Ok(light_block) => {
1197-
return Ok(light_block.parent_ptr());
1198-
}
1199-
Err(e) => {
1200-
warn!(
1201-
self.logger,
1202-
"Failed to deserialize cached block #{} {}: {}. \
1203-
Falling back to Firehose.",
1204-
block.number,
1205-
block.hash_hex(),
1206-
e
1207-
);
1208-
}
1209-
}
1210-
}
1155+
if let Some(cached_block) = blocks.into_iter().next() {
1156+
return Ok(cached_block.light_block().parent_ptr());
12111157
}
12121158
}
12131159

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ use crate::{
7979
trigger::{EthereumBlockTriggerType, EthereumTrigger},
8080
ENV_VARS,
8181
};
82-
use graph::components::ethereum::EthereumJsonBlock;
8382

8483
type AlloyProvider = FillProvider<
8584
JoinFill<
@@ -1641,23 +1640,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
16411640
.map_err(|e| error!(&logger, "Error accessing block cache {}", e))
16421641
.unwrap_or_default()
16431642
.into_iter()
1644-
.filter_map(|value| {
1645-
let json_block = EthereumJsonBlock::new(value);
1646-
if json_block.is_shallow() {
1647-
return None;
1648-
}
1649-
json_block
1650-
.into_light_block()
1651-
.map_err(|e| {
1652-
warn!(
1653-
&logger,
1654-
"Failed to deserialize cached block: {}. Block will be re-fetched from RPC.",
1655-
e
1656-
);
1657-
})
1658-
.ok()
1659-
})
1660-
.map(Arc::new)
1643+
.map(|cached| Arc::new(cached.into_light_block()))
16611644
.collect();
16621645

16631646
let missing_blocks = Vec::from_iter(

graph/src/blockchain/mock.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{
22
bail,
33
components::{
4+
ethereum::CachedBlock,
45
link_resolver::LinkResolver,
56
network_provider::ChainName,
67
store::{
@@ -527,15 +528,18 @@ impl ChainStore for MockChainStore {
527528
) -> Result<Option<B256>, Error> {
528529
unimplemented!()
529530
}
530-
async fn blocks(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<Value>, Error> {
531+
async fn blocks(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<CachedBlock>, Error> {
532+
unimplemented!()
533+
}
534+
async fn blocks_as_json(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<Value>, Error> {
531535
unimplemented!()
532536
}
533537
async fn ancestor_block(
534538
self: Arc<Self>,
535539
_block_ptr: BlockPtr,
536540
_offset: BlockNumber,
537541
_root: Option<BlockHash>,
538-
) -> Result<Option<(Value, BlockPtr)>, Error> {
542+
) -> Result<Option<(CachedBlock, BlockPtr)>, Error> {
539543
unimplemented!()
540544
}
541545
async fn cleanup_cached_blocks(

graph/src/components/ethereum/json_block.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use serde_json::{self as json, Value};
22

33
use super::json_patch;
4-
use super::types::{EthereumBlock, LightEthereumBlock};
4+
use super::types::{CachedBlock, EthereumBlock, LightEthereumBlock};
55

66
#[derive(Debug)]
77
pub struct EthereumJsonBlock(Value);
@@ -49,4 +49,18 @@ impl EthereumJsonBlock {
4949
json_patch::patch_block_transactions(&mut inner);
5050
json::from_value(inner)
5151
}
52+
53+
/// Tries to deserialize into a `CachedBlock`. Uses `transaction_receipts`
54+
/// presence to decide between full and light block, avoiding a JSON clone.
55+
pub fn try_into_cached_block(self) -> Option<CachedBlock> {
56+
let has_receipts = self
57+
.0
58+
.get("transaction_receipts")
59+
.is_some_and(|v| !v.is_null());
60+
if has_receipts {
61+
self.into_full_block().ok().map(CachedBlock::Full)
62+
} else {
63+
self.into_light_block().ok().map(CachedBlock::Light)
64+
}
65+
}
5266
}

graph/src/components/ethereum/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ mod types;
66
pub use self::json_block::EthereumJsonBlock;
77
pub use self::network::AnyNetworkBare;
88
pub use self::types::{
9-
AnyBlock, AnyTransaction, AnyTransactionReceiptBare, EthereumBlock, EthereumBlockWithCalls,
10-
EthereumCall, LightEthereumBlock, LightEthereumBlockExt,
9+
AnyBlock, AnyTransaction, AnyTransactionReceiptBare, CachedBlock, EthereumBlock,
10+
EthereumBlockWithCalls, EthereumCall, LightEthereumBlock, LightEthereumBlockExt,
1111
};
1212

1313
// Re-export Alloy network types for convenience

graph/src/components/ethereum/types.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@ use crate::{
1717
prelude::BlockNumber,
1818
};
1919

20+
use super::json_block::EthereumJsonBlock;
21+
2022
pub type AnyTransaction = Transaction<AnyTxEnvelope>;
2123
pub type AnyBlock = Block<AnyTransaction, Header<AnyHeader>>;
2224
/// Like alloy's `AnyTransactionReceipt` but without the `WithOtherFields` wrapper,
2325
/// avoiding `#[serde(flatten)]` overhead during deserialization.
2426
pub type AnyTransactionReceiptBare = TransactionReceipt<AnyReceiptEnvelope<Log>>;
2527

2628
#[allow(dead_code)]
27-
#[derive(Debug, Deserialize, Serialize)]
29+
#[derive(Clone, Debug, Deserialize, Serialize)]
2830
pub struct LightEthereumBlock(AnyBlock);
2931

3032
impl Default for LightEthereumBlock {
@@ -259,3 +261,55 @@ impl<'a> From<&'a EthereumCall> for BlockPtr {
259261
BlockPtr::from((call.block_hash, call.block_number))
260262
}
261263
}
264+
265+
/// Typed cached block for Ethereum. Stores the deserialized block so that
266+
/// repeated reads from the in-memory cache avoid `serde_json::from_value()`.
267+
#[derive(Clone, Debug)]
268+
#[allow(clippy::large_enum_variant)]
269+
pub enum CachedBlock {
270+
Full(EthereumBlock),
271+
Light(LightEthereumBlock),
272+
}
273+
274+
impl CachedBlock {
275+
pub fn light_block(&self) -> &LightEthereumBlock {
276+
match self {
277+
CachedBlock::Full(block) => &block.block,
278+
CachedBlock::Light(block) => block,
279+
}
280+
}
281+
282+
pub fn into_light_block(self) -> LightEthereumBlock {
283+
match self {
284+
CachedBlock::Full(block) => block.block.as_ref().clone(),
285+
CachedBlock::Light(block) => block,
286+
}
287+
}
288+
289+
pub fn into_full_block(self) -> Option<EthereumBlock> {
290+
match self {
291+
CachedBlock::Full(block) => Some(block),
292+
CachedBlock::Light(_) => None,
293+
}
294+
}
295+
296+
pub fn from_json(value: serde_json::Value) -> Option<Self> {
297+
let json_block = EthereumJsonBlock::new(value);
298+
if json_block.is_shallow() {
299+
return None;
300+
}
301+
json_block.try_into_cached_block()
302+
}
303+
304+
pub fn timestamp(&self) -> Option<u64> {
305+
Some(self.light_block().timestamp_u64())
306+
}
307+
308+
pub fn parent_ptr(&self) -> Option<BlockPtr> {
309+
self.light_block().parent_ptr()
310+
}
311+
312+
pub fn ptr(&self) -> BlockPtr {
313+
self.light_block().block_ptr()
314+
}
315+
}

graph/src/components/store/traits.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use async_trait::async_trait;
77
use super::*;
88
use crate::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor};
99
use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr};
10+
use crate::components::ethereum::CachedBlock;
1011
use crate::components::metrics::stopwatch::StopwatchMetrics;
1112
use crate::components::network_provider::ChainName;
1213
use crate::components::server::index_node::VersionInfo;
@@ -553,8 +554,12 @@ pub trait ChainStore: ChainHeadStore {
553554
ancestor_count: BlockNumber,
554555
) -> Result<Option<B256>, Error>;
555556

556-
/// Returns the blocks present in the store.
557-
async fn blocks(
557+
/// Returns the blocks present in the store as typed cached blocks.
558+
async fn blocks(self: Arc<Self>, hashes: Vec<BlockHash>) -> Result<Vec<CachedBlock>, Error>;
559+
560+
/// Returns blocks as raw JSON. Used by callers that need the original
561+
/// JSON representation (e.g., GraphQL block queries, CLI tools).
562+
async fn blocks_as_json(
558563
self: Arc<Self>,
559564
hashes: Vec<BlockHash>,
560565
) -> Result<Vec<serde_json::Value>, Error>;
@@ -584,7 +589,7 @@ pub trait ChainStore: ChainHeadStore {
584589
block_ptr: BlockPtr,
585590
offset: BlockNumber,
586591
root: Option<BlockHash>,
587-
) -> Result<Option<(serde_json::Value, BlockPtr)>, Error>;
592+
) -> Result<Option<(CachedBlock, BlockPtr)>, Error>;
588593

589594
/// Remove old blocks from the cache we maintain in the database and
590595
/// return a pair containing the number of the oldest block retained

node/src/manager/commands/check_blocks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ mod steps {
187187
block_hash: B256,
188188
chain_store: Arc<ChainStore>,
189189
) -> anyhow::Result<Value> {
190-
let blocks = chain_store.blocks(vec![block_hash.into()]).await?;
190+
let blocks = chain_store.blocks_as_json(vec![block_hash.into()]).await?;
191191
match blocks.len() {
192192
0 => bail!("Failed to locate block with hash {} in store", block_hash),
193193
1 => {}

server/index-node/src/resolver.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,9 @@ where
222222
return Ok(r::Value::Null);
223223
};
224224

225-
let blocks_res = chain_store.blocks(vec![block_hash.cheap_clone()]).await;
225+
let blocks_res = chain_store
226+
.blocks_as_json(vec![block_hash.cheap_clone()])
227+
.await;
226228
Ok(match blocks_res {
227229
Ok(blocks) if blocks.is_empty() => {
228230
error!(

0 commit comments

Comments
 (0)