diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index 912e4f384b2..560a50de980 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -269,7 +269,7 @@ impl TriggersAdapterTrait for TriggersAdapter { })) } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, diff --git a/chain/cosmos/src/chain.rs b/chain/cosmos/src/chain.rs index d276c017e7c..6110f71f116 100644 --- a/chain/cosmos/src/chain.rs +++ b/chain/cosmos/src/chain.rs @@ -197,15 +197,13 @@ impl TriggersAdapterTrait for TriggersAdapter { ) -> Result, Error> { panic!("Should never be called since not used by FirehoseBlockStream") } - - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, ) -> Result, Error> { - unimplemented!() + todo!() } - async fn chain_head_ptr(&self) -> Result, Error> { unimplemented!() } diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 3d4dc00c030..1ccdb3d8f2d 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1,6 +1,7 @@ use anyhow::Error; use ethabi::{Error as ABIError, Function, ParamType, Token}; use graph::blockchain::ChainIdentifier; +use graph::blockchain::ExtendedBlockPtr; use graph::components::subgraph::MappingError; use graph::data::store::ethereum::call; use graph::firehose::CallToFilter; @@ -1109,12 +1110,12 @@ pub trait EthereumAdapter: Send + Sync + 'static { block_hash: H256, ) -> Box + Send>; - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _chain_store: Arc, _block_numbers: HashSet, - ) -> Box, Error = Error> + Send>; + ) -> Box, Error = Error> + Send>; /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. /// May use the `chain_store` as a cache. diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index ff28c975c40..a4a8598e8ed 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -3,8 +3,8 @@ use anyhow::{Context, Error}; use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms}; use graph::blockchain::{ - BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper, - TriggersAdapterSelector, + BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, ExtendedBlockPtr, + TriggerFilterWrapper, TriggersAdapterSelector, }; use graph::components::network_provider::ChainName; use graph::components::store::{DeploymentCursorTracker, SourceableStore}; @@ -156,6 +156,7 @@ impl BlockStreamBuilder for EthereumStreamBuilder { unified_api_version: UnifiedMappingApiVersion, ) -> Result>> { let requirements = filter.chain_filter.node_capabilities(); + let is_using_subgraph_composition = !source_subgraph_stores.is_empty(); let adapter = TriggersAdapterWrapper::new( chain .triggers_adapter(&deployment, &requirements, unified_api_version.clone()) @@ -181,20 +182,32 @@ impl BlockStreamBuilder for EthereumStreamBuilder { // This is ok because Celo blocks are always final. And we _need_ to do this because // some events appear only in eth_getLogs but not in transaction receipts. // See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50. - let chain_id = match chain.chain_client().as_ref() { + let reorg_threshold = match chain.chain_client().as_ref() { ChainClient::Rpc(adapter) => { - adapter + let chain_id = adapter .cheapest() .await .ok_or(anyhow!("unable to get eth adapter for chan_id call"))? .chain_id() - .await? + .await?; + + if CELO_CHAIN_IDS.contains(&chain_id) { + 0 + } else { + chain.reorg_threshold + } } - _ => panic!("expected rpc when using polling blockstream"), + _ if is_using_subgraph_composition => chain.reorg_threshold, + _ => panic!( + "expected rpc when using polling blockstream : {}", + is_using_subgraph_composition + ), }; - let reorg_threshold = match CELO_CHAIN_IDS.contains(&chain_id) { - false => chain.reorg_threshold, - true => 0, + + let max_block_range_size = if is_using_subgraph_composition { + ENV_VARS.max_block_range_size * 10 + } else { + ENV_VARS.max_block_range_size }; Ok(Box::new(PollingBlockStream::new( @@ -207,7 +220,7 @@ impl BlockStreamBuilder for EthereumStreamBuilder { start_blocks, reorg_threshold, logger, - ENV_VARS.max_block_range_size, + max_block_range_size, ENV_VARS.target_triggers_per_block_range, unified_api_version, subgraph_current_block, @@ -617,6 +630,8 @@ pub enum BlockFinality { // If a block may still be reorged, we need to work with more local data. NonFinal(EthereumBlockWithCalls), + + Ptr(Arc), } impl Default for BlockFinality { @@ -630,6 +645,7 @@ impl BlockFinality { match self { BlockFinality::Final(block) => block, BlockFinality::NonFinal(block) => &block.ethereum_block.block, + BlockFinality::Ptr(_) => unreachable!("light_block called on HeaderOnly"), } } } @@ -639,6 +655,7 @@ impl<'a> From<&'a BlockFinality> for BlockPtr { match block { BlockFinality::Final(b) => BlockPtr::from(&**b), BlockFinality::NonFinal(b) => BlockPtr::from(&b.ethereum_block), + BlockFinality::Ptr(b) => BlockPtr::new(b.hash.clone(), b.number), } } } @@ -648,6 +665,7 @@ impl Block for BlockFinality { match self { BlockFinality::Final(block) => block.block_ptr(), BlockFinality::NonFinal(block) => block.ethereum_block.block.block_ptr(), + BlockFinality::Ptr(block) => BlockPtr::new(block.hash.clone(), block.number), } } @@ -655,6 +673,9 @@ impl Block for BlockFinality { match self { BlockFinality::Final(block) => block.parent_ptr(), BlockFinality::NonFinal(block) => block.ethereum_block.block.parent_ptr(), + BlockFinality::Ptr(block) => { + Some(BlockPtr::new(block.parent_hash.clone(), block.number - 1)) + } } } @@ -687,6 +708,7 @@ impl Block for BlockFinality { json::to_value(eth_block) } BlockFinality::NonFinal(block) => json::to_value(&block.ethereum_block), + BlockFinality::Ptr(_) => Ok(json::Value::Null), } } @@ -694,6 +716,7 @@ impl Block for BlockFinality { let ts = match self { BlockFinality::Final(block) => block.timestamp, BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp, + BlockFinality::Ptr(block) => block.timestamp, }; let ts = i64::try_from(ts.as_u64()).unwrap(); BlockTime::since_epoch(ts, 0) @@ -735,7 +758,7 @@ impl TriggersAdapterTrait for TriggersAdapter { .await } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, logger: Logger, block_numbers: HashSet, @@ -749,9 +772,9 @@ impl TriggersAdapterTrait for TriggersAdapter { .await?; let blocks = adapter - .load_blocks_by_numbers(logger, self.chain_store.clone(), block_numbers) + .load_block_ptrs_by_numbers(logger, self.chain_store.clone(), block_numbers) .await - .map(|block| BlockFinality::Final(block)) + .map(|block| BlockFinality::Ptr(block)) .collect() .compat() .await?; @@ -812,6 +835,7 @@ impl TriggersAdapterTrait for TriggersAdapter { triggers.append(&mut parse_block_triggers(&filter.block, full_block)); Ok(BlockWithTriggers::new(block, triggers, logger)) } + BlockFinality::Ptr(_) => unreachable!("triggers_in_block called on HeaderOnly"), } } diff --git a/chain/ethereum/src/env.rs b/chain/ethereum/src/env.rs index 75c313212b9..bc7223dbc07 100644 --- a/chain/ethereum/src/env.rs +++ b/chain/ethereum/src/env.rs @@ -33,6 +33,9 @@ pub struct EnvVars { /// Set by the environment variable `ETHEREUM_BLOCK_BATCH_SIZE`. The /// default value is 10 blocks. pub block_batch_size: usize, + /// Set by the environment variable `ETHEREUM_BLOCK_PTR_BATCH_SIZE`. The + /// default value is 10 blocks. + pub block_ptr_batch_size: usize, /// Maximum number of blocks to request in each chunk. /// /// Set by the environment variable `GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE`. @@ -116,6 +119,7 @@ impl From for EnvVars { trace_stream_step_size: x.trace_stream_step_size, max_event_only_range: x.max_event_only_range, block_batch_size: x.block_batch_size, + block_ptr_batch_size: x.block_ptr_batch_size, max_block_range_size: x.max_block_range_size, json_rpc_timeout: Duration::from_secs(x.json_rpc_timeout_in_secs), block_receipts_check_timeout: Duration::from_secs( @@ -160,6 +164,8 @@ struct Inner { max_event_only_range: BlockNumber, #[envconfig(from = "ETHEREUM_BLOCK_BATCH_SIZE", default = "10")] block_batch_size: usize, + #[envconfig(from = "ETHEREUM_BLOCK_PTR_BATCH_SIZE", default = "100")] + block_ptr_batch_size: usize, #[envconfig(from = "GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE", default = "2000")] max_block_range_size: BlockNumber, #[envconfig(from = "GRAPH_ETHEREUM_JSON_RPC_TIMEOUT", default = "180")] diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 8fd836d1742..dc672f1b5d9 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -2,6 +2,7 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered}; use graph::blockchain::client::ChainClient; use graph::blockchain::BlockHash; use graph::blockchain::ChainIdentifier; +use graph::blockchain::ExtendedBlockPtr; use graph::components::transaction_receipt::LightTransactionReceipt; use graph::data::store::ethereum::call; @@ -783,11 +784,11 @@ impl EthereumAdapter { } /// Request blocks by number through JSON-RPC. - fn load_blocks_by_numbers_rpc( + fn load_block_ptrs_by_numbers_rpc( &self, logger: Logger, numbers: Vec, - ) -> impl Stream, Error = Error> + Send { + ) -> impl Stream, Error = Error> + Send { let web3 = self.web3.clone(); stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| { @@ -798,19 +799,29 @@ impl EthereumAdapter { .run(move || { Box::pin( web3.eth() - .block_with_txs(BlockId::Number(Web3BlockNumber::Number( - number.into(), - ))), + .block(BlockId::Number(Web3BlockNumber::Number(number.into()))), ) .compat() .from_err::() .and_then(move |block| { - block.map(Arc::new).ok_or_else(|| { - anyhow::anyhow!( - "Ethereum node did not find block with number {:?}", - number - ) - }) + block + .map(|block| { + let ptr = ExtendedBlockPtr::try_from(( + block.hash, + block.number, + block.parent_hash, + block.timestamp, + )) + .unwrap(); + + Arc::new(ptr) + }) + .ok_or_else(|| { + anyhow::anyhow!( + "Ethereum node did not find block with number {:?}", + number + ) + }) }) .compat() }) @@ -818,7 +829,7 @@ impl EthereumAdapter { .compat() .from_err() })) - .buffered(ENV_VARS.block_batch_size) + .buffered(ENV_VARS.block_ptr_batch_size) } /// Request blocks ptrs for numbers through JSON-RPC. @@ -1690,15 +1701,15 @@ impl EthereumAdapterTrait for EthereumAdapter { } /// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream. - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, logger: Logger, chain_store: Arc, block_numbers: HashSet, - ) -> Box, Error = Error> + Send> { - let blocks_map: BTreeMap> = chain_store + ) -> Box, Error = Error> + Send> { + let blocks_map = chain_store .cheap_clone() - .blocks_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) + .block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) .await .map_err(|e| { error!(&logger, "Error accessing block cache {}", e); @@ -1706,11 +1717,11 @@ impl EthereumAdapterTrait for EthereumAdapter { }) .unwrap_or_default(); - let mut blocks: Vec> = blocks_map + let mut blocks: Vec> = blocks_map .into_iter() .filter_map(|(_number, values)| { if values.len() == 1 { - json::from_value(values[0].clone()).ok() + Arc::new(values[0].clone()).into() } else { None } @@ -1719,7 +1730,7 @@ impl EthereumAdapterTrait for EthereumAdapter { let missing_blocks: Vec = block_numbers .into_iter() - .filter(|&number| !blocks.iter().any(|block| block.number() == number)) + .filter(|&number| !blocks.iter().any(|block| block.block_number() == number)) .collect(); if !missing_blocks.is_empty() { @@ -1731,20 +1742,9 @@ impl EthereumAdapterTrait for EthereumAdapter { } Box::new( - self.load_blocks_by_numbers_rpc(logger.clone(), missing_blocks) + self.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_blocks) .collect() .map(move |new_blocks| { - let upsert_blocks: Vec<_> = new_blocks - .iter() - .map(|block| BlockFinality::Final(block.clone())) - .collect(); - let block_refs: Vec<_> = upsert_blocks - .iter() - .map(|block| block as &dyn graph::blockchain::Block) - .collect(); - if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) { - error!(logger, "Error writing to block cache {}", e); - } blocks.extend(new_blocks); blocks.sort_by_key(|block| block.number); stream::iter_ok(blocks) @@ -2028,6 +2028,9 @@ pub(crate) async fn get_calls( calls: Some(calls), })) } + BlockFinality::Ptr(_) => { + unreachable!("get_calls called with BlockFinality::Ptr") + } } } @@ -2209,6 +2212,11 @@ async fn filter_call_triggers_from_unsuccessful_transactions( "this function should not be called when dealing with non-final blocks" ) } + BlockFinality::Ptr(_block) => { + unreachable!( + "this function should not be called when dealing with header-only blocks" + ) + } } }; diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 9fd7d510519..61770502b5f 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -325,7 +325,7 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index d2d10cde9e9..b7c5ccd0660 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -136,7 +136,7 @@ impl blockchain::TriggersAdapter for TriggersAdapter { unimplemented!() } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 46cdb5e0a71..1977fb12801 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -419,7 +419,7 @@ async fn scan_subgraph_triggers( block_numbers.insert(to); let blocks = adapter - .load_blocks_by_numbers(logger.clone(), block_numbers) + .load_block_ptrs_by_numbers(logger.clone(), block_numbers) .await?; create_subgraph_triggers::(logger.clone(), blocks, filter, entities).await @@ -591,7 +591,7 @@ pub trait TriggersAdapter: Send + Sync { /// Get pointer to parent of `block`. This is called when reverting `block`. async fn chain_head_ptr(&self) -> Result, Error>; - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, logger: Logger, block_numbers: HashSet, diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 18f1de92546..2f1480dd46a 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -229,7 +229,7 @@ impl TriggersAdapter for MockTriggersAdapter { todo!() } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet, diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index e81618a740d..2629fa9b4b2 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -53,7 +53,7 @@ pub use block_stream::{ChainHeadUpdateListener, ChainHeadUpdateStream, TriggersA pub use builder::{BasicBlockchainBuilder, BlockchainBuilder}; pub use empty_node_capabilities::EmptyNodeCapabilities; pub use noop_runtime_adapter::NoopRuntimeAdapter; -pub use types::{BlockHash, BlockPtr, BlockTime, ChainIdentifier}; +pub use types::{BlockHash, BlockPtr, BlockTime, ChainIdentifier, ExtendedBlockPtr}; use self::{ block_stream::{BlockStream, FirehoseCursor}, diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 824956ea2d1..286215fb845 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -5,10 +5,11 @@ use diesel::serialize::{Output, ToSql}; use diesel::sql_types::Timestamptz; use diesel::sql_types::{Bytea, Nullable, Text}; use diesel_derives::{AsExpression, FromSqlRow}; +use serde::{Deserialize, Deserializer}; use std::convert::TryFrom; use std::time::Duration; use std::{fmt, str::FromStr}; -use web3::types::{Block, H256}; +use web3::types::{Block, H256, U256, U64}; use crate::cheap_clone::CheapClone; use crate::components::store::BlockNumber; @@ -48,6 +49,16 @@ impl BlockHash { } } +impl<'de> Deserialize<'de> for BlockHash { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s: String = Deserialize::deserialize(deserializer)?; + BlockHash::from_str(&s).map_err(serde::de::Error::custom) + } +} + impl CheapClone for BlockHash { fn cheap_clone(&self) -> Self { Self(self.0.clone()) @@ -330,6 +341,174 @@ impl From for BlockNumber { } } +fn deserialize_block_number<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s: String = Deserialize::deserialize(deserializer)?; + + if s.starts_with("0x") { + let s = s.trim_start_matches("0x"); + i32::from_str_radix(s, 16).map_err(serde::de::Error::custom) + } else { + i32::from_str(&s).map_err(serde::de::Error::custom) + } +} + +#[derive(Clone, PartialEq, Eq, Hash, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ExtendedBlockPtr { + pub hash: BlockHash, + #[serde(deserialize_with = "deserialize_block_number")] + pub number: BlockNumber, + pub parent_hash: BlockHash, + pub timestamp: U256, +} + +impl ExtendedBlockPtr { + pub fn new( + hash: BlockHash, + number: BlockNumber, + parent_hash: BlockHash, + timestamp: U256, + ) -> Self { + Self { + hash, + number, + parent_hash, + timestamp, + } + } + + /// Encodes the block hash into a hexadecimal string **without** a "0x" prefix. + /// Hashes are stored in the database in this format. + pub fn hash_hex(&self) -> String { + self.hash.hash_hex() + } + + /// Encodes the parent block hash into a hexadecimal string **without** a "0x" prefix. + pub fn parent_hash_hex(&self) -> String { + self.parent_hash.hash_hex() + } + + /// Block number to be passed into the store. Panics if it does not fit in an i32. + pub fn block_number(&self) -> BlockNumber { + self.number + } + + pub fn hash_as_h256(&self) -> H256 { + H256::from_slice(&self.hash_slice()[..32]) + } + + pub fn parent_hash_as_h256(&self) -> H256 { + H256::from_slice(&self.parent_hash_slice()[..32]) + } + + pub fn hash_slice(&self) -> &[u8] { + self.hash.0.as_ref() + } + + pub fn parent_hash_slice(&self) -> &[u8] { + self.parent_hash.0.as_ref() + } +} + +impl fmt::Display for ExtendedBlockPtr { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "#{} ({}) [parent: {}]", + self.number, + self.hash_hex(), + self.parent_hash_hex() + ) + } +} + +impl fmt::Debug for ExtendedBlockPtr { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "#{} ({}) [parent: {}]", + self.number, + self.hash_hex(), + self.parent_hash_hex() + ) + } +} + +impl slog::Value for ExtendedBlockPtr { + fn serialize( + &self, + record: &slog::Record, + key: slog::Key, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + slog::Value::serialize(&self.to_string(), record, key, serializer) + } +} + +impl IntoValue for ExtendedBlockPtr { + fn into_value(self) -> r::Value { + object! { + __typename: "Block", + hash: self.hash_hex(), + number: format!("{}", self.number), + parent_hash: self.parent_hash_hex(), + timestamp: format!("{}", self.timestamp), + } + } +} + +impl TryFrom<(Option, Option, H256, U256)> for ExtendedBlockPtr { + type Error = anyhow::Error; + + fn try_from(tuple: (Option, Option, H256, U256)) -> Result { + let (hash_opt, number_opt, parent_hash, timestamp) = tuple; + + let hash = hash_opt.ok_or_else(|| anyhow!("Block hash is missing"))?; + let number = number_opt + .ok_or_else(|| anyhow!("Block number is missing"))? + .as_u64(); + + let block_number = + i32::try_from(number).map_err(|_| anyhow!("Block number out of range"))?; + + Ok(ExtendedBlockPtr { + hash: hash.into(), + number: block_number, + parent_hash: parent_hash.into(), + timestamp, + }) + } +} + +impl TryFrom<(H256, i32, H256, U256)> for ExtendedBlockPtr { + type Error = anyhow::Error; + + fn try_from(tuple: (H256, i32, H256, U256)) -> Result { + let (hash, block_number, parent_hash, timestamp) = tuple; + + Ok(ExtendedBlockPtr { + hash: hash.into(), + number: block_number, + parent_hash: parent_hash.into(), + timestamp, + }) + } +} +impl From for H256 { + fn from(ptr: ExtendedBlockPtr) -> Self { + ptr.hash_as_h256() + } +} + +impl From for BlockNumber { + fn from(ptr: ExtendedBlockPtr) -> Self { + ptr.number + } +} + #[derive(Clone, Debug, PartialEq, Eq, Hash)] /// A collection of attributes that (kind of) uniquely identify a blockchain. pub struct ChainIdentifier { @@ -445,3 +624,65 @@ impl FromSql for BlockTime { >::from_sql(bytes).map(|ts| Self(ts)) } } + +#[cfg(test)] +mod tests { + use super::*; + use serde_json; + + #[test] + fn test_blockhash_deserialization() { + let json_data = "\"0x8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac\""; + + let block_hash: BlockHash = + serde_json::from_str(json_data).expect("Deserialization failed"); + + let expected_bytes = + hex::decode("8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac") + .expect("Hex decoding failed"); + + assert_eq!( + *block_hash.0, expected_bytes, + "BlockHash does not match expected bytes" + ); + } + + #[test] + fn test_block_ptr_ext_deserialization() { + // JSON data with a hex string for BlockNumber + let json_data = r#" + { + "hash": "0x8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac", + "number": "0x2A", + "parentHash": "0xabc123", + "timestamp": "123456789012345678901234567890" + } + "#; + + // Deserialize the JSON string into a ExtendedBlockPtr + let block_ptr_ext: ExtendedBlockPtr = + serde_json::from_str(json_data).expect("Deserialization failed"); + + // Verify the deserialized values + assert_eq!(block_ptr_ext.number, 42); // 0x2A in hex is 42 in decimal + } + + #[test] + fn test_invalid_block_number_deserialization() { + let invalid_json_data = r#" + { + "hash": "0x8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac", + "number": "invalid_hex_string", + "parentHash": "0xabc123", + "timestamp": "123456789012345678901234567890" + } + "#; + + let result: Result = serde_json::from_str(invalid_json_data); + + assert!( + result.is_err(), + "Deserialization should have failed for invalid block number" + ); + } +} diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 0e729faedd3..b56f4e3fca8 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -7,7 +7,7 @@ use web3::types::{Address, H256}; use super::*; use crate::blockchain::block_stream::{EntityWithType, FirehoseCursor}; -use crate::blockchain::{BlockTime, ChainIdentifier}; +use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr}; use crate::components::metrics::stopwatch::StopwatchMetrics; use crate::components::server::index_node::VersionInfo; use crate::components::subgraph::SubgraphVersionSwitchingMode; @@ -523,10 +523,10 @@ pub trait ChainStore: Send + Sync + 'static { ) -> Result, Error>; /// Returns the blocks present in the store for the given block numbers. - async fn blocks_by_numbers( + async fn block_ptrs_by_numbers( self: Arc, numbers: Vec, - ) -> Result>, Error>; + ) -> Result>, Error>; /// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching /// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding diff --git a/node/src/chain.rs b/node/src/chain.rs index 1c62bf2248e..289c0580c2e 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -433,6 +433,7 @@ pub async fn networks_as_chains( }; let client = Arc::new(cc); + let eth_adapters = Arc::new(eth_adapters); let adapter_selector = EthereumAdapterSelector::new( logger_factory.clone(), client.clone(), @@ -455,7 +456,7 @@ pub async fn networks_as_chains( Arc::new(EthereumBlockRefetcher {}), Arc::new(adapter_selector), Arc::new(EthereumRuntimeAdapterBuilder {}), - Arc::new(eth_adapters.clone()), + eth_adapters, ENV_VARS.reorg_threshold, polling_interval, true, diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 443305dc197..097aa799eff 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, PooledConnection}; @@ -21,9 +22,9 @@ use std::{ sync::Arc, }; -use graph::blockchain::{Block, BlockHash, ChainIdentifier}; +use graph::blockchain::{Block, BlockHash, ChainIdentifier, ExtendedBlockPtr}; use graph::cheap_clone::CheapClone; -use graph::prelude::web3::types::H256; +use graph::prelude::web3::types::{H256, U256}; use graph::prelude::{ async_trait, serde_json as json, transaction_receipt::LightTransactionReceipt, BlockNumber, BlockPtr, CachedEthereumCall, CancelableError, ChainStore as ChainStoreTrait, Error, @@ -53,6 +54,14 @@ impl JsonBlock { data, } } + + fn timestamp(&self) -> Option { + self.data + .as_ref() + .and_then(|data| data.get("timestamp")) + .and_then(|ts| ts.as_str()) + .and_then(|ts| U256::from_dec_str(ts).ok()) + } } /// Tables in the 'public' database schema that store chain-specific data @@ -580,7 +589,7 @@ mod data { Ok(()) } - pub(super) fn blocks_by_numbers( + pub(super) fn block_ptrs_by_numbers( &self, conn: &mut PgConnection, chain: &str, @@ -1930,7 +1939,7 @@ impl ChainStore { .with_conn(move |conn, _| { store .storage - .blocks_by_numbers(conn, &store.chain, &numbers) + .block_ptrs_by_numbers(conn, &store.chain, &numbers) .map_err(CancelableError::from) }) .await?; @@ -1949,6 +1958,21 @@ impl ChainStore { } } +fn json_block_to_block_ptr_ext(json_block: &JsonBlock) -> Result { + let hash = json_block.ptr.hash.clone(); + let number = json_block.ptr.number; + let parent_hash = json_block.parent_hash.clone(); + + let timestamp = json_block + .timestamp() + .ok_or_else(|| anyhow!("Timestamp is missing"))?; + + let ptr = + ExtendedBlockPtr::try_from((hash.as_h256(), number, parent_hash.as_h256(), timestamp)) + .map_err(|e| anyhow!("Failed to convert to ExtendedBlockPtr: {}", e))?; + + Ok(ptr) +} #[async_trait] impl ChainStoreTrait for ChainStore { fn genesis_block_ptr(&self) -> Result { @@ -2142,28 +2166,16 @@ impl ChainStoreTrait for ChainStore { Ok(()) } - async fn blocks_by_numbers( + async fn block_ptrs_by_numbers( self: Arc, numbers: Vec, - ) -> Result>, Error> { - if ENV_VARS.store.disable_block_cache_for_lookup { - let values = self - .blocks_from_store_by_numbers(numbers) - .await? - .into_iter() - .map(|(num, blocks)| { - ( - num, - blocks - .into_iter() - .filter_map(|block| block.data) - .collect::>(), - ) - }) - .collect(); - Ok(values) + ) -> Result>, Error> { + let result = if ENV_VARS.store.disable_block_cache_for_lookup { + let values = self.blocks_from_store_by_numbers(numbers).await?; + + values } else { - let cached = self.recent_blocks_cache.get_blocks_by_numbers(&numbers); + let cached = self.recent_blocks_cache.get_block_ptrs_by_numbers(&numbers); let stored = if cached.len() < numbers.len() { let missing_numbers = numbers @@ -2209,16 +2221,28 @@ impl ChainStoreTrait for ChainStore { .map(|(ptr, data)| (ptr.block_number(), vec![data])) .collect::>(); - let mut result: BTreeMap> = cached_map; + let mut result = cached_map; for (num, blocks) in stored { - result - .entry(num) - .or_default() - .extend(blocks.into_iter().filter_map(|block| block.data)); + if !result.contains_key(&num) { + result.insert(num, blocks); + } } - Ok(result) - } + result + }; + + let ptrs = result + .into_iter() + .map(|(num, blocks)| { + let ptrs = blocks + .into_iter() + .filter_map(|block| json_block_to_block_ptr_ext(&block).ok()) + .collect(); + (num, ptrs) + }) + .collect(); + + Ok(ptrs) } async fn blocks(self: Arc, hashes: Vec) -> Result, Error> { @@ -2527,10 +2551,8 @@ mod recent_blocks_cache { .and_then(|block| block.data.as_ref().map(|data| (&block.ptr, data))) } - fn get_block_by_number(&self, number: BlockNumber) -> Option<(&BlockPtr, &json::Value)> { - self.blocks - .get(&number) - .and_then(|block| block.data.as_ref().map(|data| (&block.ptr, data))) + fn get_block_by_number(&self, number: BlockNumber) -> Option<&JsonBlock> { + self.blocks.get(&number) } fn get_ancestor( @@ -2655,16 +2677,16 @@ mod recent_blocks_cache { blocks } - pub fn get_blocks_by_numbers( + pub fn get_block_ptrs_by_numbers( &self, numbers: &[BlockNumber], - ) -> Vec<(BlockPtr, json::Value)> { + ) -> Vec<(BlockPtr, JsonBlock)> { let inner = self.inner.read(); - let mut blocks: Vec<(BlockPtr, json::Value)> = Vec::new(); + let mut blocks: Vec<(BlockPtr, JsonBlock)> = Vec::new(); for &number in numbers { - if let Some((ptr, block)) = inner.get_block_by_number(number) { - blocks.push((ptr.clone(), block.clone())); + if let Some(block) = inner.get_block_by_number(number) { + blocks.push((block.ptr.clone(), block.clone())); } } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index c21d3198272..9cd3747f7c6 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -995,7 +995,7 @@ impl TriggersAdapter for MockTriggersAdapter { todo!() } - async fn load_blocks_by_numbers( + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, _block_numbers: HashSet,