@@ -1734,7 +1734,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
17341734 logger : Logger ,
17351735 chain_store : Arc < dyn ChainStore > ,
17361736 block_hashes : HashSet < H256 > ,
1737- ) -> Box < dyn Stream < Item = Arc < LightEthereumBlock > , Error = Error > + Send > {
1737+ ) -> Result < Vec < Arc < LightEthereumBlock > > , Error > {
17381738 let block_hashes: Vec < _ > = block_hashes. iter ( ) . cloned ( ) . collect ( ) ;
17391739 // Search for the block in the store first then use json-rpc as a backup.
17401740 let mut blocks: Vec < Arc < LightEthereumBlock > > = chain_store
@@ -1756,27 +1756,25 @@ impl EthereumAdapterTrait for EthereumAdapter {
17561756
17571757 // Return a stream that lazily loads batches of blocks.
17581758 debug ! ( logger, "Requesting {} block(s)" , missing_blocks. len( ) ) ;
1759- Box :: new (
1760- self . load_blocks_rpc ( logger. clone ( ) , missing_blocks)
1761- . collect ( )
1762- . map ( move |new_blocks| {
1763- let upsert_blocks: Vec < _ > = new_blocks
1764- . iter ( )
1765- . map ( |block| BlockFinality :: Final ( block. clone ( ) ) )
1766- . collect ( ) ;
1767- let block_refs: Vec < _ > = upsert_blocks
1768- . iter ( )
1769- . map ( |block| block as & dyn graph:: blockchain:: Block )
1770- . collect ( ) ;
1771- if let Err ( e) = chain_store. upsert_light_blocks ( block_refs. as_slice ( ) ) {
1772- error ! ( logger, "Error writing to block cache {}" , e) ;
1773- }
1774- blocks. extend ( new_blocks) ;
1775- blocks. sort_by_key ( |block| block. number ) ;
1776- stream:: iter_ok ( blocks)
1777- } )
1778- . flatten_stream ( ) ,
1779- )
1759+ let new_blocks = self
1760+ . load_blocks_rpc ( logger. clone ( ) , missing_blocks)
1761+ . collect ( )
1762+ . compat ( )
1763+ . await ?;
1764+ let upsert_blocks: Vec < _ > = new_blocks
1765+ . iter ( )
1766+ . map ( |block| BlockFinality :: Final ( block. clone ( ) ) )
1767+ . collect ( ) ;
1768+ let block_refs: Vec < _ > = upsert_blocks
1769+ . iter ( )
1770+ . map ( |block| block as & dyn graph:: blockchain:: Block )
1771+ . collect ( ) ;
1772+ if let Err ( e) = chain_store. upsert_light_blocks ( block_refs. as_slice ( ) ) {
1773+ error ! ( logger, "Error writing to block cache {}" , e) ;
1774+ }
1775+ blocks. extend ( new_blocks) ;
1776+ blocks. sort_by_key ( |block| block. number ) ;
1777+ Ok ( blocks)
17801778 }
17811779}
17821780
@@ -1911,10 +1909,11 @@ pub(crate) async fn blocks_with_triggers(
19111909
19121910 let logger2 = logger. cheap_clone ( ) ;
19131911
1914- let blocks = eth
1912+ let blocks: Vec < _ > = eth
19151913 . load_blocks ( logger. cheap_clone ( ) , chain_store. clone ( ) , block_hashes)
1916- . await
1917- . and_then (
1914+ . await ?
1915+ . into_iter ( )
1916+ . map (
19181917 move |block| match triggers_by_block. remove ( & ( block. number ( ) as BlockNumber ) ) {
19191918 Some ( triggers) => Ok ( BlockWithTriggers :: new (
19201919 BlockFinality :: Final ( block) ,
@@ -1927,9 +1926,7 @@ pub(crate) async fn blocks_with_triggers(
19271926 ) ) ,
19281927 } ,
19291928 )
1930- . collect ( )
1931- . compat ( )
1932- . await ?;
1929+ . collect :: < Result < _ , _ > > ( ) ?;
19331930
19341931 // Filter out call triggers that come from unsuccessful transactions
19351932 let futures = blocks. into_iter ( ) . map ( |block| {
0 commit comments