@@ -17,7 +17,7 @@ use graph::prelude::{
1717 EthereumCallCache , LightEthereumBlock , LightEthereumBlockExt , MetricsRegistry ,
1818} ;
1919use graph:: schema:: InputSchema ;
20- use graph:: slog:: { debug, error, trace} ;
20+ use graph:: slog:: { debug, error, trace, warn } ;
2121use graph:: substreams:: Clock ;
2222use graph:: {
2323 blockchain:: {
@@ -257,6 +257,7 @@ pub struct EthereumAdapterSelector {
257257 client : Arc < ChainClient < Chain > > ,
258258 registry : Arc < MetricsRegistry > ,
259259 chain_store : Arc < dyn ChainStore > ,
260+ eth_adapters : Arc < EthereumNetworkAdapters > ,
260261}
261262
262263impl EthereumAdapterSelector {
@@ -265,12 +266,14 @@ impl EthereumAdapterSelector {
265266 client : Arc < ChainClient < Chain > > ,
266267 registry : Arc < MetricsRegistry > ,
267268 chain_store : Arc < dyn ChainStore > ,
269+ eth_adapters : Arc < EthereumNetworkAdapters > ,
268270 ) -> Self {
269271 Self {
270272 logger_factory,
271273 client,
272274 registry,
273275 chain_store,
276+ eth_adapters,
274277 }
275278 }
276279}
@@ -296,6 +299,7 @@ impl TriggersAdapterSelector<Chain> for EthereumAdapterSelector {
296299 chain_store : self . chain_store . cheap_clone ( ) ,
297300 unified_api_version,
298301 capabilities : * capabilities,
302+ eth_adapters : self . eth_adapters . cheap_clone ( ) ,
299303 } ;
300304 Ok ( Arc :: new ( adapter) )
301305 }
@@ -739,6 +743,7 @@ pub struct TriggersAdapter {
739743 chain_client : Arc < ChainClient < Chain > > ,
740744 capabilities : NodeCapabilities ,
741745 unified_api_version : UnifiedMappingApiVersion ,
746+ eth_adapters : Arc < EthereumNetworkAdapters > ,
742747}
743748
744749/// Fetches blocks from the cache based on block numbers, excluding duplicates
@@ -784,12 +789,34 @@ async fn fetch_unique_blocks_from_cache(
784789 "Loading {} block(s) not in the block cache" ,
785790 missing_blocks. len( )
786791 ) ;
787- debug ! ( logger, "Missing blocks {:?}" , missing_blocks) ;
792+ trace ! ( logger, "Missing blocks {:?}" , missing_blocks. len ( ) ) ;
788793 }
789794
790795 ( blocks, missing_blocks)
791796}
792797
798+ // This is used to load blocks from the RPC.
799+ async fn load_blocks_with_rpc (
800+ logger : & Logger ,
801+ adapter : Arc < EthereumAdapter > ,
802+ chain_store : Arc < dyn ChainStore > ,
803+ block_numbers : BTreeSet < BlockNumber > ,
804+ ) -> Result < Vec < BlockFinality > > {
805+ let logger_clone = logger. clone ( ) ;
806+ load_blocks (
807+ logger,
808+ chain_store,
809+ block_numbers,
810+ |missing_numbers| async move {
811+ adapter
812+ . load_block_ptrs_by_numbers_rpc ( logger_clone, missing_numbers)
813+ . try_collect ( )
814+ . await
815+ } ,
816+ )
817+ . await
818+ }
819+
793820/// Fetches blocks by their numbers, first attempting to load from cache.
794821/// Missing blocks are retrieved from an external source, with all blocks sorted and converted to `BlockFinality` format.
795822async fn load_blocks < F , Fut > (
@@ -847,6 +874,37 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
847874 ) -> Result < Vec < BlockFinality > > {
848875 match & * self . chain_client {
849876 ChainClient :: Firehose ( endpoints) => {
877+ // If the force_rpc_for_block_ptrs flag is set, we will use the RPC to load the blocks
878+ // even if the firehose is available. If no adapter is available, we will log an error.
879+ // And then fallback to the firehose.
880+ if ENV_VARS . force_rpc_for_block_ptrs {
881+ trace ! (
882+ logger,
883+ "Loading blocks from RPC (force_rpc_for_block_ptrs is set)" ;
884+ "block_numbers" => format!( "{:?}" , block_numbers)
885+ ) ;
886+ match self . eth_adapters . cheapest_with ( & self . capabilities ) . await {
887+ Ok ( adapter) => {
888+ match load_blocks_with_rpc (
889+ & logger,
890+ adapter,
891+ self . chain_store . clone ( ) ,
892+ block_numbers. clone ( ) ,
893+ )
894+ . await
895+ {
896+ Ok ( blocks) => return Ok ( blocks) ,
897+ Err ( e) => {
898+ warn ! ( logger, "Error loading blocks from RPC: {}" , e) ;
899+ }
900+ }
901+ }
902+ Err ( e) => {
903+ warn ! ( logger, "Error getting cheapest adapter: {}" , e) ;
904+ }
905+ }
906+ }
907+
850908 trace ! (
851909 logger,
852910 "Loading blocks from firehose" ;
@@ -884,29 +942,16 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
884942 . await
885943 }
886944
887- ChainClient :: Rpc ( client ) => {
945+ ChainClient :: Rpc ( eth_adapters ) => {
888946 trace ! (
889947 logger,
890948 "Loading blocks from RPC" ;
891949 "block_numbers" => format!( "{:?}" , block_numbers)
892950 ) ;
893951
894- let adapter = client. cheapest_with ( & self . capabilities ) . await ?;
895- let chain_store = self . chain_store . clone ( ) ;
896- let logger_clone = logger. clone ( ) ;
897-
898- load_blocks (
899- & logger,
900- chain_store,
901- block_numbers,
902- |missing_numbers| async move {
903- adapter
904- . load_block_ptrs_by_numbers_rpc ( logger_clone, missing_numbers)
905- . try_collect ( )
906- . await
907- } ,
908- )
909- . await
952+ let adapter = eth_adapters. cheapest_with ( & self . capabilities ) . await ?;
953+ load_blocks_with_rpc ( & logger, adapter, self . chain_store . clone ( ) , block_numbers)
954+ . await
910955 }
911956 }
912957 }
@@ -973,10 +1018,12 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
9731018 ChainClient :: Firehose ( endpoints) => {
9741019 let endpoint = endpoints. endpoint ( ) . await ?;
9751020 let block = endpoint
976- . get_block_by_number :: < codec:: Block > ( ptr. number as u64 , & self . logger )
1021+ . get_block_by_number_with_retry :: < codec:: Block > ( ptr. number as u64 , & self . logger )
9771022 . await
978- . map_err ( |e| anyhow ! ( "Failed to fetch block from firehose: {}" , e) ) ?;
979-
1023+ . context ( format ! (
1024+ "Failed to fetch block {} from firehose" ,
1025+ ptr. number
1026+ ) ) ?;
9801027 Ok ( block. hash ( ) == ptr. hash )
9811028 }
9821029 ChainClient :: Rpc ( adapter) => {
0 commit comments