diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 9fe0b8262b2..8fd836d1742 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -782,6 +782,45 @@ impl EthereumAdapter { .buffered(ENV_VARS.block_batch_size) } + /// Request blocks by number through JSON-RPC. + fn load_blocks_by_numbers_rpc( + &self, + logger: Logger, + numbers: Vec, + ) -> impl Stream, Error = Error> + Send { + let web3 = self.web3.clone(); + + stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| { + let web3 = web3.clone(); + retry(format!("load block {}", number), &logger) + .limit(ENV_VARS.request_retries) + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + Box::pin( + web3.eth() + .block_with_txs(BlockId::Number(Web3BlockNumber::Number( + number.into(), + ))), + ) + .compat() + .from_err::() + .and_then(move |block| { + block.map(Arc::new).ok_or_else(|| { + anyhow::anyhow!( + "Ethereum node did not find block with number {:?}", + number + ) + }) + }) + .compat() + }) + .boxed() + .compat() + .from_err() + })) + .buffered(ENV_VARS.block_batch_size) + } + /// Request blocks ptrs for numbers through JSON-RPC. /// /// Reorg safety: If ids are numbers, they must be a final blocks. @@ -1650,26 +1689,68 @@ impl EthereumAdapterTrait for EthereumAdapter { Ok(decoded) } - // This is a ugly temporary implementation to get the block ptrs for a range of blocks + /// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream. async fn load_blocks_by_numbers( &self, logger: Logger, chain_store: Arc, block_numbers: HashSet, ) -> Box, Error = Error> + Send> { - let block_hashes = block_numbers + let blocks_map: BTreeMap> = chain_store + .cheap_clone() + .blocks_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) + .await + .map_err(|e| { + error!(&logger, "Error accessing block cache {}", e); + e + }) + .unwrap_or_default(); + + let mut blocks: Vec> = blocks_map .into_iter() - .map(|number| { - chain_store - .block_hashes_by_block_number(number) - .unwrap() - .first() - .unwrap() - .as_h256() + .filter_map(|(_number, values)| { + if values.len() == 1 { + json::from_value(values[0].clone()).ok() + } else { + None + } }) - .collect::>(); + .collect::>(); - self.load_blocks(logger, chain_store, block_hashes).await + let missing_blocks: Vec = block_numbers + .into_iter() + .filter(|&number| !blocks.iter().any(|block| block.number() == number)) + .collect(); + + if !missing_blocks.is_empty() { + debug!( + logger, + "Loading {} block(s) not in the block cache", + missing_blocks.len() + ); + } + + Box::new( + self.load_blocks_by_numbers_rpc(logger.clone(), missing_blocks) + .collect() + .map(move |new_blocks| { + let upsert_blocks: Vec<_> = new_blocks + .iter() + .map(|block| BlockFinality::Final(block.clone())) + .collect(); + let block_refs: Vec<_> = upsert_blocks + .iter() + .map(|block| block as &dyn graph::blockchain::Block) + .collect(); + if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) { + error!(logger, "Error writing to block cache {}", e); + } + blocks.extend(new_blocks); + blocks.sort_by_key(|block| block.number); + stream::iter_ok(blocks) + }) + .flatten_stream(), + ) } /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. diff --git a/graph/src/blockchain/client.rs b/graph/src/blockchain/client.rs index 8d83536b577..1ac1b4f892c 100644 --- a/graph/src/blockchain/client.rs +++ b/graph/src/blockchain/client.rs @@ -41,7 +41,7 @@ impl ChainClient { pub fn rpc(&self) -> anyhow::Result<&C::Client> { match self { Self::Rpc(rpc) => Ok(rpc), - _ => Err(anyhow!("rpc endpoint requested on firehose chain client")), + Self::Firehose(_) => Err(anyhow!("rpc endpoint requested on firehose chain client")), } } } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index cb26df66880..0e729faedd3 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -522,6 +522,12 @@ pub trait ChainStore: Send + Sync + 'static { hashes: Vec, ) -> Result, Error>; + /// Returns the blocks present in the store for the given block numbers. + async fn blocks_by_numbers( + self: Arc, + numbers: Vec, + ) -> Result>, Error>; + /// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching /// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding /// a child of `root`. Returns None if unable to complete due to missing blocks in the chain diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index b399b15b788..443305dc197 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -13,6 +13,7 @@ use graph::slog::Logger; use graph::stable_hash::crypto_stable_hash; use graph::util::herd_cache::HerdCache; +use std::collections::BTreeMap; use std::{ collections::HashMap, convert::{TryFrom, TryInto}, @@ -579,6 +580,50 @@ mod data { Ok(()) } + pub(super) fn blocks_by_numbers( + &self, + conn: &mut PgConnection, + chain: &str, + numbers: &[BlockNumber], + ) -> Result, StoreError> { + let x = match self { + Storage::Shared => { + use public::ethereum_blocks as b; + + b::table + .select(( + b::hash, + b::number, + b::parent_hash, + sql::("coalesce(data -> 'block', data)"), + )) + .filter(b::network_name.eq(chain)) + .filter(b::number.eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64)))) + .load::<(BlockHash, i64, BlockHash, json::Value)>(conn) + } + Storage::Private(Schema { blocks, .. }) => blocks + .table() + .select(( + blocks.hash(), + blocks.number(), + blocks.parent_hash(), + sql::("coalesce(data -> 'block', data)"), + )) + .filter( + blocks + .number() + .eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64))), + ) + .load::<(BlockHash, i64, BlockHash, json::Value)>(conn), + }?; + + Ok(x.into_iter() + .map(|(hash, nr, parent, data)| { + JsonBlock::new(BlockPtr::new(hash, nr as i32), parent, Some(data)) + }) + .collect()) + } + pub(super) fn blocks( &self, conn: &mut PgConnection, @@ -1651,7 +1696,10 @@ impl ChainStoreMetrics { } #[derive(Clone, CheapClone)] -struct BlocksLookupResult(Arc, StoreError>>); +enum BlocksLookupResult { + ByHash(Arc, StoreError>>), + ByNumber(Arc>, StoreError>>), +} pub struct ChainStore { logger: Logger, @@ -1870,6 +1918,35 @@ impl ChainStore { .await?; Ok(values) } + + async fn blocks_from_store_by_numbers( + self: &Arc, + numbers: Vec, + ) -> Result>, StoreError> { + let store = self.cheap_clone(); + let pool = self.pool.clone(); + + let values = pool + .with_conn(move |conn, _| { + store + .storage + .blocks_by_numbers(conn, &store.chain, &numbers) + .map_err(CancelableError::from) + }) + .await?; + + let mut block_map = BTreeMap::new(); + + for block in values { + let block_number = block.ptr.block_number(); + block_map + .entry(block_number) + .or_insert_with(Vec::new) + .push(block); + } + + Ok(block_map) + } } #[async_trait] @@ -2065,6 +2142,85 @@ impl ChainStoreTrait for ChainStore { Ok(()) } + async fn blocks_by_numbers( + self: Arc, + numbers: Vec, + ) -> Result>, Error> { + if ENV_VARS.store.disable_block_cache_for_lookup { + let values = self + .blocks_from_store_by_numbers(numbers) + .await? + .into_iter() + .map(|(num, blocks)| { + ( + num, + blocks + .into_iter() + .filter_map(|block| block.data) + .collect::>(), + ) + }) + .collect(); + Ok(values) + } else { + let cached = self.recent_blocks_cache.get_blocks_by_numbers(&numbers); + + let stored = if cached.len() < numbers.len() { + let missing_numbers = numbers + .iter() + .filter(|num| !cached.iter().any(|(ptr, _)| ptr.block_number() == **num)) + .cloned() + .collect::>(); + + let hash = crypto_stable_hash(&missing_numbers); + let this = self.clone(); + let lookup_fut = async move { + let res = this.blocks_from_store_by_numbers(missing_numbers).await; + BlocksLookupResult::ByNumber(Arc::new(res)) + }; + let lookup_herd = self.lookup_herd.cheap_clone(); + let logger = self.logger.cheap_clone(); + let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await { + (BlocksLookupResult::ByNumber(res), _) => res, + _ => unreachable!(), + }; + let res = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone()); + + match res { + Ok(blocks) => { + for (_, blocks_for_num) in &blocks { + if blocks.len() == 1 { + self.recent_blocks_cache + .insert_block(blocks_for_num[0].clone()); + } + } + blocks + } + Err(e) => { + return Err(e.into()); + } + } + } else { + BTreeMap::new() + }; + + let cached_map = cached + .into_iter() + .map(|(ptr, data)| (ptr.block_number(), vec![data])) + .collect::>(); + + let mut result: BTreeMap> = cached_map; + for (num, blocks) in stored { + result + .entry(num) + .or_default() + .extend(blocks.into_iter().filter_map(|block| block.data)); + } + + Ok(result) + } + } + async fn blocks(self: Arc, hashes: Vec) -> Result, Error> { if ENV_VARS.store.disable_block_cache_for_lookup { let values = self @@ -2094,12 +2250,22 @@ impl ChainStoreTrait for ChainStore { let this = self.clone(); let lookup_fut = async move { let res = this.blocks_from_store(hashes).await; - BlocksLookupResult(Arc::new(res)) + BlocksLookupResult::ByHash(Arc::new(res)) }; let lookup_herd = self.lookup_herd.cheap_clone(); let logger = self.logger.cheap_clone(); - let (BlocksLookupResult(res), _) = - lookup_herd.cached_query(hash, lookup_fut, &logger).await; + // This match can only return ByHash because lookup_fut explicitly constructs + // BlocksLookupResult::ByHash. The cache preserves the exact future result, + // so ByNumber variant is structurally impossible here. + let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await { + (BlocksLookupResult::ByHash(res), _) => res, + (BlocksLookupResult::ByNumber(_), _) => { + Arc::new(Err(StoreError::Unknown(anyhow::anyhow!( + "Unexpected BlocksLookupResult::ByNumber returned from cached block lookup by hash" + )))) + } + }; + // Try to avoid cloning a non-concurrent lookup; it's not // entirely clear whether that will actually avoid a clone // since it depends on a lot of the details of how the @@ -2361,6 +2527,12 @@ mod recent_blocks_cache { .and_then(|block| block.data.as_ref().map(|data| (&block.ptr, data))) } + fn get_block_by_number(&self, number: BlockNumber) -> Option<(&BlockPtr, &json::Value)> { + self.blocks + .get(&number) + .and_then(|block| block.data.as_ref().map(|data| (&block.ptr, data))) + } + fn get_ancestor( &self, child_ptr: &BlockPtr, @@ -2483,6 +2655,28 @@ mod recent_blocks_cache { blocks } + pub fn get_blocks_by_numbers( + &self, + numbers: &[BlockNumber], + ) -> Vec<(BlockPtr, json::Value)> { + let inner = self.inner.read(); + let mut blocks: Vec<(BlockPtr, json::Value)> = Vec::new(); + + for &number in numbers { + if let Some((ptr, block)) = inner.get_block_by_number(number) { + blocks.push((ptr.clone(), block.clone())); + } + } + + inner.metrics.record_hit_and_miss( + &inner.network, + blocks.len(), + numbers.len() - blocks.len(), + ); + + blocks + } + /// Tentatively caches the `ancestor` of a [`BlockPtr`] (`child`), together with /// its associated `data`. Note that for this to work, `child` must be /// in the cache already. The first block in the cache should be