From 4787dbaab1a58e19d011a9617437a471cdd2f1bb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 27 Jul 2023 13:55:20 +0300 Subject: [PATCH 01/16] chainHead/api: Make storage/body/call pure RPC methods Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/api.rs | 30 +-- .../rpc-spec-v2/src/chain_head/chain_head.rs | 207 ++++++++---------- 2 files changed, 97 insertions(+), 140 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/api.rs b/client/rpc-spec-v2/src/chain_head/api.rs index 8905e03687747..c002b75efe037 100644 --- a/client/rpc-spec-v2/src/chain_head/api.rs +++ b/client/rpc-spec-v2/src/chain_head/api.rs @@ -19,7 +19,7 @@ #![allow(non_snake_case)] //! API trait of the chain head. -use crate::chain_head::event::{ChainHeadEvent, FollowEvent, StorageQuery}; +use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; #[rpc(client, server)] @@ -47,12 +47,12 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[subscription( - name = "chainHead_unstable_body", - unsubscribe = "chainHead_unstable_stopBody", - item = ChainHeadEvent, - )] - fn chain_head_unstable_body(&self, follow_subscription: String, hash: Hash); + #[method(name = "chainHead_unstable_body", blocking)] + fn chain_head_unstable_body( + &self, + follow_subscription: String, + hash: Hash, + ) -> RpcResult; /// Retrieves the header of a pinned block. /// @@ -86,36 +86,28 @@ pub trait ChainHeadApi { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[subscription( - name = "chainHead_unstable_storage", - unsubscribe = "chainHead_unstable_stopStorage", - item = ChainHeadEvent, - )] + #[method(name = "chainHead_unstable_storage", blocking)] fn chain_head_unstable_storage( &self, follow_subscription: String, hash: Hash, items: Vec>, child_trie: Option, - ); + ) -> RpcResult; /// Call into the Runtime API at a specified block's state. /// /// # Unstable /// /// This method is unstable and subject to change in the future. - #[subscription( - name = "chainHead_unstable_call", - unsubscribe = "chainHead_unstable_stopCall", - item = ChainHeadEvent, - )] + #[method(name = "chainHead_unstable_call", blocking)] fn chain_head_unstable_call( &self, follow_subscription: String, hash: Hash, function: String, call_parameters: String, - ); + ) -> RpcResult; /// Unpin a block reported by the `follow` method. /// diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index a2c9afc034906..6bfa417252032 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -18,12 +18,16 @@ //! API implementation for `chainHead`. +use super::{chain_head_storage::ChainHeadStorage, subscription::BlockGuard}; use crate::{ chain_head::{ api::ChainHeadApiServer, chain_head_follow::ChainHeadFollower, error::Error as ChainHeadRpcError, - event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent}, + event::{ + ChainHeadEvent, ChainHeadResult, ChainHeadStorageEvent, ErrorEvent, FollowEvent, + MethodResponse, StorageQuery, StorageQueryType, + }, hex_string, subscription::{SubscriptionManagement, SubscriptionManagementError}, }, @@ -47,11 +51,6 @@ use sp_core::{traits::CallContext, Bytes}; use sp_runtime::traits::Block as BlockT; use std::{marker::PhantomData, sync::Arc, time::Duration}; -use super::{ - chain_head_storage::ChainHeadStorage, - event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType}, -}; - pub(crate) const LOG_TARGET: &str = "rpc-spec-v2"; /// An API for chain head RPC calls. @@ -121,11 +120,8 @@ impl, Block: BlockT, Client> ChainHead { /// Parse hex-encoded string parameter as raw bytes. /// -/// If the parsing fails, the subscription is rejected. -fn parse_hex_param( - sink: &mut SubscriptionSink, - param: String, -) -> Result, SubscriptionEmptyError> { +/// If the parsing fails, returns an error propagated to the RPC method. +fn parse_hex_param(param: String) -> Result, ChainHeadRpcError> { // Methods can accept empty parameters. if param.is_empty() { return Ok(Default::default()) @@ -133,10 +129,7 @@ fn parse_hex_param( match array_bytes::hex2bytes(¶m) { Ok(bytes) => Ok(bytes), - Err(_) => { - let _ = sink.reject(ChainHeadRpcError::InvalidParam(param)); - Err(SubscriptionEmptyError) - }, + Err(_) => Err(ChainHeadRpcError::InvalidParam(param)), } } @@ -202,59 +195,50 @@ where fn chain_head_unstable_body( &self, - mut sink: SubscriptionSink, follow_subscription: String, hash: Block::Hash, - ) -> SubscriptionResult { + ) -> RpcResult { let client = self.client.clone(); - let subscriptions = self.subscriptions.clone(); - let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadEvent::::Disjoint); - return Ok(()) + return Ok(MethodResponse::LimitReached) }, Err(SubscriptionManagementError::BlockHashAbsent) => { // Block is not part of the subscription. - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return Ok(()) - }, - Err(error) => { - let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { - error: error.to_string(), - })); - return Ok(()) + return Err(ChainHeadRpcError::InvalidBlock.into()) }, + Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - let fut = async move { - let _block_guard = block_guard; - let event = match client.block(hash) { - Ok(Some(signed_block)) => { - let extrinsics = signed_block.block.extrinsics(); - let result = hex_string(&extrinsics.encode()); - ChainHeadEvent::Done(ChainHeadResult { result }) - }, - Ok(None) => { - // The block's body was pruned. This subscription ID has become invalid. - debug!( - target: LOG_TARGET, - "[body][id={:?}] Stopping subscription because hash={:?} was pruned", - &follow_subscription, - hash - ); - subscriptions.remove_subscription(&follow_subscription); - ChainHeadEvent::::Disjoint - }, - Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }), - }; - let _ = sink.send(&event); - }; - - self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - Ok(()) + // let fut = async move { + // let _block_guard = block_guard; + // let event = match client.block(hash) { + // Ok(Some(signed_block)) => { + // let extrinsics = signed_block.block.extrinsics(); + // let result = hex_string(&extrinsics.encode()); + // ChainHeadEvent::Done(ChainHeadResult { result }) + // }, + // Ok(None) => { + // // The block's body was pruned. This subscription ID has become invalid. + // debug!( + // target: LOG_TARGET, + // "[body][id={:?}] Stopping subscription because hash={:?} was pruned", + // &follow_subscription, + // hash + // ); + // subscriptions.remove_subscription(&follow_subscription); + // ChainHeadEvent::::Disjoint + // }, + // Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }), + // }; + // let _ = sink.send(&event); + // }; + + // self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + Ok(MethodResponse::LimitReached) } fn chain_head_unstable_header( @@ -288,128 +272,109 @@ where fn chain_head_unstable_storage( &self, - mut sink: SubscriptionSink, follow_subscription: String, hash: Block::Hash, items: Vec>, child_trie: Option, - ) -> SubscriptionResult { + ) -> RpcResult { // Gain control over parameter parsing and returned error. let items = items .into_iter() .map(|query| { if query.query_type == StorageQueryType::ClosestDescendantMerkleValue { // Note: remove this once all types are implemented. - let _ = sink.reject(ChainHeadRpcError::InvalidParam( + return Err(ChainHeadRpcError::InvalidParam( "Storage query type not supported".into(), - )); - return Err(SubscriptionEmptyError) + )) } Ok(StorageQuery { - key: StorageKey(parse_hex_param(&mut sink, query.key)?), + key: StorageKey(parse_hex_param(query.key)?), query_type: query.query_type, }) }) .collect::, _>>()?; let child_trie = child_trie - .map(|child_trie| parse_hex_param(&mut sink, child_trie)) + .map(|child_trie| parse_hex_param(child_trie)) .transpose()? .map(ChildInfo::new_default_from_vec); let client = self.client.clone(); - let subscriptions = self.subscriptions.clone(); - let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadStorageEvent::Disjoint); - return Ok(()) + return Ok(MethodResponse::LimitReached) }, Err(SubscriptionManagementError::BlockHashAbsent) => { // Block is not part of the subscription. - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return Ok(()) - }, - Err(error) => { - let _ = sink - .send(&ChainHeadStorageEvent::Error(ErrorEvent { error: error.to_string() })); - return Ok(()) + return Err(ChainHeadRpcError::InvalidBlock.into()) }, + Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; + // let storage_client = ChainHeadStorage::::new(client); - let storage_client = ChainHeadStorage::::new(client); + // let fut = async move { + // let _block_guard = block_guard; - let fut = async move { - let _block_guard = block_guard; + // storage_client.generate_events(sink, hash, items, child_trie); + // }; - storage_client.generate_events(sink, hash, items, child_trie); - }; - - self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - Ok(()) + // self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + Ok(MethodResponse::LimitReached) } fn chain_head_unstable_call( &self, - mut sink: SubscriptionSink, follow_subscription: String, hash: Block::Hash, function: String, call_parameters: String, - ) -> SubscriptionResult { - let call_parameters = Bytes::from(parse_hex_param(&mut sink, call_parameters)?); + ) -> RpcResult { + let call_parameters = Bytes::from(parse_hex_param(call_parameters)?); let client = self.client.clone(); - let subscriptions = self.subscriptions.clone(); - let block_guard = match subscriptions.lock_block(&follow_subscription, hash) { + let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) => { // Invalid invalid subscription ID. - let _ = sink.send(&ChainHeadEvent::::Disjoint); - return Ok(()) + return Ok(MethodResponse::LimitReached) }, Err(SubscriptionManagementError::BlockHashAbsent) => { // Block is not part of the subscription. - let _ = sink.reject(ChainHeadRpcError::InvalidBlock); - return Ok(()) - }, - Err(error) => { - let _ = sink.send(&ChainHeadEvent::::Error(ErrorEvent { - error: error.to_string(), - })); - return Ok(()) + return Err(ChainHeadRpcError::InvalidBlock.into()) }, + Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - let fut = async move { - // Reject subscription if with_runtime is false. - if !block_guard.has_runtime() { - let _ = sink.reject(ChainHeadRpcError::InvalidParam( - "The runtime updates flag must be set".into(), - )); - return - } - - let res = client - .executor() - .call(hash, &function, &call_parameters, CallContext::Offchain) - .map(|result| { - let result = hex_string(&result); - ChainHeadEvent::Done(ChainHeadResult { result }) - }) - .unwrap_or_else(|error| { - ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }) - }); - - let _ = sink.send(&res); - }; - - self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - Ok(()) + // let fut = async move { + // // Reject subscription if with_runtime is false. + // if !block_guard.has_runtime() { + // let _ = sink.reject(ChainHeadRpcError::InvalidParam( + // "The runtime updates flag must be set".into(), + // )); + // return + // } + + // let res = client + // .executor() + // .call(hash, &function, &call_parameters, CallContext::Offchain) + // .map(|result| { + // let result = hex_string(&result); + // ChainHeadEvent::Done(ChainHeadResult { result }) + // }) + // .unwrap_or_else(|error| { + // ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }) + // }); + + // let _ = sink.send(&res); + // }; + + // self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + Ok(MethodResponse::LimitReached) } fn chain_head_unstable_unpin( From 8050811596a6f17813b8659054f4ec6c6f48bb0d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 27 Jul 2023 15:15:32 +0300 Subject: [PATCH 02/16] chainHead: Add mpsc channel between RPC methods Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/Cargo.toml | 2 +- .../rpc-spec-v2/src/chain_head/chain_head.rs | 6 ++-- .../src/chain_head/chain_head_follow.rs | 6 ++-- .../src/chain_head/subscription/inner.rs | 33 +++++++++++++++++-- .../src/chain_head/subscription/mod.rs | 7 ++-- 5 files changed, 40 insertions(+), 14 deletions(-) diff --git a/client/rpc-spec-v2/Cargo.toml b/client/rpc-spec-v2/Cargo.toml index 4f5c11212a9b2..b1ab2a8799744 100644 --- a/client/rpc-spec-v2/Cargo.toml +++ b/client/rpc-spec-v2/Cargo.toml @@ -24,6 +24,7 @@ sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } sp-version = { version = "22.0.0", path = "../../primitives/version" } sc-client-api = { version = "4.0.0-dev", path = "../api" } +sc-utils = { version = "4.0.0-dev", path = "../utils" } codec = { package = "parity-scale-codec", version = "3.6.1" } thiserror = "1.0" serde = "1.0" @@ -44,6 +45,5 @@ sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/comm sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" } sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../service" } -sc-utils = { version = "4.0.0-dev", path = "../utils" } assert_matches = "1.3.0" pretty_assertions = "1.2.1" diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 6bfa417252032..54a411ae0f191 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -45,6 +45,7 @@ use sc_client_api::{ Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey, StorageProvider, }; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_api::CallApiAt; use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}; use sp_core::{traits::CallContext, Bytes}; @@ -80,7 +81,6 @@ impl, Block: BlockT, Client> ChainHead { max_pinned_duration: Duration, ) -> Self { let genesis_hash = hex_string(&genesis_hash.as_ref()); - Self { client, backend: backend.clone(), @@ -161,7 +161,7 @@ where }, }; // Keep track of the subscription. - let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), with_runtime) + let Some(sub_data) = self.subscriptions.insert_subscription(sub_id.clone(), with_runtime) else { // Inserting the subscription can only fail if the JsonRPSee // generated a duplicate subscription ID. @@ -183,7 +183,7 @@ where sub_id.clone(), ); - chain_head_follow.generate_events(sink, rx_stop).await; + chain_head_follow.generate_events(sink, sub_data).await; subscriptions.remove_subscription(&sub_id); debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id); diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 799978be532ae..7cc7c586cab20 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -24,7 +24,7 @@ use crate::chain_head::{ BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent, }, - subscription::{SubscriptionManagement, SubscriptionManagementError}, + subscription::{InsertedSubscriptionData, SubscriptionManagement, SubscriptionManagementError}, }; use futures::{ channel::oneshot, @@ -572,7 +572,7 @@ where pub async fn generate_events( &mut self, mut sink: SubscriptionSink, - rx_stop: oneshot::Receiver<()>, + sub_data: InsertedSubscriptionData, ) { // Register for the new block and finalized notifications. let stream_import = self @@ -604,7 +604,7 @@ where let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized); let stream = stream::once(futures::future::ready(initial)).chain(merged); - self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, rx_stop) + self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop) .await; } } diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index be8b8f46a2844..47b3328bcb67e 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -18,6 +18,7 @@ use futures::channel::oneshot; use sc_client_api::Backend; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap}, @@ -25,7 +26,10 @@ use std::{ time::{Duration, Instant}, }; -use crate::chain_head::subscription::SubscriptionManagementError; +use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent}; + +/// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings. +const QUEUE_SIZE_WARNING: usize = 512; /// The state machine of a block of a single subscription ID. /// @@ -116,6 +120,10 @@ struct SubscriptionState { with_runtime: bool, /// Signals the "Stop" event. tx_stop: Option>, + /// The sender of message responses to the `chainHead_follow` events. + /// + /// This object is cloned between methods. + response_sender: TracingUnboundedSender>, /// Track the block hashes available for this subscription. /// /// This implementation assumes: @@ -272,6 +280,15 @@ impl> Drop for BlockGuard { } } +/// The data propagated back to the `chainHead_follow` method after +/// the subscription is successfully inserted. +pub struct InsertedSubscriptionData { + /// Signal that the subscription must stop. + pub rx_stop: oneshot::Receiver<()>, + /// Receive message responses from the `chainHead` methods. + pub response_receiver: TracingUnboundedReceiver>, +} + pub struct SubscriptionsInner> { /// Reference count the block hashes across all subscriptions. /// @@ -311,16 +328,20 @@ impl> SubscriptionsInner { &mut self, sub_id: String, with_runtime: bool, - ) -> Option> { + ) -> Option> { if let Entry::Vacant(entry) = self.subs.entry(sub_id) { let (tx_stop, rx_stop) = oneshot::channel(); + let (response_sender, response_receiver) = + tracing_unbounded("chain-head-method-responses", QUEUE_SIZE_WARNING); let state = SubscriptionState:: { with_runtime, tx_stop: Some(tx_stop), + response_sender, blocks: Default::default(), }; entry.insert(state); - Some(rx_stop) + + Some(InsertedSubscriptionData { rx_stop, response_receiver }) } else { None } @@ -604,9 +625,12 @@ mod tests { #[test] fn sub_state_register_twice() { + let (response_sender, _response_receiver) = + tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING); let mut sub_state = SubscriptionState:: { with_runtime: false, tx_stop: None, + response_sender, blocks: Default::default(), }; @@ -629,9 +653,12 @@ mod tests { #[test] fn sub_state_register_unregister() { + let (response_sender, _response_receiver) = + tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING); let mut sub_state = SubscriptionState:: { with_runtime: false, tx_stop: None, + response_sender, blocks: Default::default(), }; diff --git a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index 86e55acc4c176..3aece6575ef66 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -16,7 +16,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use futures::channel::oneshot; use parking_lot::RwLock; use sc_client_api::Backend; use sp_runtime::traits::Block as BlockT; @@ -25,9 +24,9 @@ use std::{sync::Arc, time::Duration}; mod error; mod inner; +use self::inner::SubscriptionsInner; pub use error::SubscriptionManagementError; -pub use inner::BlockGuard; -use inner::SubscriptionsInner; +pub use inner::{BlockGuard, InsertedSubscriptionData}; /// Manage block pinning / unpinning for subscription IDs. pub struct SubscriptionManagement> { @@ -61,7 +60,7 @@ impl> SubscriptionManagement { &self, sub_id: String, runtime_updates: bool, - ) -> Option> { + ) -> Option> { let mut inner = self.inner.write(); inner.insert_subscription(sub_id, runtime_updates) } From 3b30eb3beb4030b6b618967732d79937b1089bbe Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 27 Jul 2023 15:20:09 +0300 Subject: [PATCH 03/16] chainHead/subscriptions: Extract mpsc::Sender via BlockGuard Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 47b3328bcb67e..5ea6ad12e019d 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -243,6 +243,7 @@ impl SubscriptionState { pub struct BlockGuard> { hash: Block::Hash, with_runtime: bool, + response_sender: TracingUnboundedSender>, backend: Arc, } @@ -259,19 +260,25 @@ impl> BlockGuard { fn new( hash: Block::Hash, with_runtime: bool, + response_sender: TracingUnboundedSender>, backend: Arc, ) -> Result { backend .pin_block(hash) .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; - Ok(Self { hash, with_runtime, backend }) + Ok(Self { hash, with_runtime, response_sender, backend }) } /// The `with_runtime` flag of the subscription. pub fn has_runtime(&self) -> bool { self.with_runtime } + + /// Send message responses from the `chainHead` methods to `chainHead_follow`. + pub fn response_sender(&self) -> TracingUnboundedSender> { + self.response_sender.clone() + } } impl> Drop for BlockGuard { @@ -520,7 +527,7 @@ impl> SubscriptionsInner { return Err(SubscriptionManagementError::BlockHashAbsent) } - BlockGuard::new(hash, sub.with_runtime, self.backend.clone()) + BlockGuard::new(hash, sub.with_runtime, sub.response_sender.clone(), self.backend.clone()) } } @@ -948,17 +955,17 @@ mod tests { let id = "abc".to_string(); - let mut rx_stop = subs.insert_subscription(id.clone(), true).unwrap(); + let mut sub_data = subs.insert_subscription(id.clone(), true).unwrap(); // Check the stop signal was not received. - let res = rx_stop.try_recv().unwrap(); + let res = sub_data.rx_stop.try_recv().unwrap(); assert!(res.is_none()); let sub = subs.subs.get_mut(&id).unwrap(); sub.stop(); // Check the signal was received. - let res = rx_stop.try_recv().unwrap(); + let res = sub_data.rx_stop.try_recv().unwrap(); assert!(res.is_some()); } } From d5305a954467eede884dada5d39d85e94320f78c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 27 Jul 2023 15:31:32 +0300 Subject: [PATCH 04/16] chainHead/subscriptions: Generate and provide the method operation ID Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 5ea6ad12e019d..142dfe4dd9cf1 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -124,6 +124,8 @@ struct SubscriptionState { /// /// This object is cloned between methods. response_sender: TracingUnboundedSender>, + /// The next operation ID. + next_operation_id: usize, /// Track the block hashes available for this subscription. /// /// This implementation assumes: @@ -235,6 +237,13 @@ impl SubscriptionState { } timestamp } + + /// Generate the next operation ID for this subscription. + fn next_operation_id(&mut self) -> usize { + let op_id = self.next_operation_id; + self.next_operation_id += 1; + op_id + } } /// Keeps a specific block pinned while the handle is alive. @@ -244,6 +253,7 @@ pub struct BlockGuard> { hash: Block::Hash, with_runtime: bool, response_sender: TracingUnboundedSender>, + operation_id: String, backend: Arc, } @@ -261,13 +271,20 @@ impl> BlockGuard { hash: Block::Hash, with_runtime: bool, response_sender: TracingUnboundedSender>, + operation_id: usize, backend: Arc, ) -> Result { backend .pin_block(hash) .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; - Ok(Self { hash, with_runtime, response_sender, backend }) + Ok(Self { + hash, + with_runtime, + response_sender, + operation_id: operation_id.to_string(), + backend, + }) } /// The `with_runtime` flag of the subscription. @@ -279,6 +296,11 @@ impl> BlockGuard { pub fn response_sender(&self) -> TracingUnboundedSender> { self.response_sender.clone() } + + /// The operation ID of this method. + pub fn operation_id(&self) -> String { + self.operation_id.clone() + } } impl> Drop for BlockGuard { @@ -344,6 +366,7 @@ impl> SubscriptionsInner { with_runtime, tx_stop: Some(tx_stop), response_sender, + next_operation_id: 0, blocks: Default::default(), }; entry.insert(state); @@ -519,7 +542,7 @@ impl> SubscriptionsInner { sub_id: &str, hash: Block::Hash, ) -> Result, SubscriptionManagementError> { - let Some(sub) = self.subs.get(sub_id) else { + let Some(sub) = self.subs.get_mut(sub_id) else { return Err(SubscriptionManagementError::SubscriptionAbsent) }; @@ -527,7 +550,14 @@ impl> SubscriptionsInner { return Err(SubscriptionManagementError::BlockHashAbsent) } - BlockGuard::new(hash, sub.with_runtime, sub.response_sender.clone(), self.backend.clone()) + let operation_id = sub.next_operation_id(); + BlockGuard::new( + hash, + sub.with_runtime, + sub.response_sender.clone(), + operation_id, + self.backend.clone(), + ) } } @@ -638,6 +668,7 @@ mod tests { with_runtime: false, tx_stop: None, response_sender, + next_operation_id: 0, blocks: Default::default(), }; @@ -666,6 +697,7 @@ mod tests { with_runtime: false, tx_stop: None, response_sender, + next_operation_id: 0, blocks: Default::default(), }; From 50c840d7e46abed7415f576b0cf056493f65fa09 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 27 Jul 2023 15:40:25 +0300 Subject: [PATCH 05/16] chainHead: Generate `chainHead_body` response Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 72 +++++++++++-------- 1 file changed, 43 insertions(+), 29 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 54a411ae0f191..8ebfb49db95fc 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -18,7 +18,11 @@ //! API implementation for `chainHead`. -use super::{chain_head_storage::ChainHeadStorage, subscription::BlockGuard}; +use super::{ + chain_head_storage::ChainHeadStorage, + event::{MethodResponseStarted, OperationBodyDone}, + subscription::BlockGuard, +}; use crate::{ chain_head::{ api::ChainHeadApiServer, @@ -26,7 +30,7 @@ use crate::{ error::Error as ChainHeadRpcError, event::{ ChainHeadEvent, ChainHeadResult, ChainHeadStorageEvent, ErrorEvent, FollowEvent, - MethodResponse, StorageQuery, StorageQueryType, + MethodResponse, OperationError, StorageQuery, StorageQueryType, }, hex_string, subscription::{SubscriptionManagement, SubscriptionManagementError}, @@ -198,8 +202,6 @@ where follow_subscription: String, hash: Block::Hash, ) -> RpcResult { - let client = self.client.clone(); - let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) => { @@ -213,32 +215,44 @@ where Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - // let fut = async move { - // let _block_guard = block_guard; - // let event = match client.block(hash) { - // Ok(Some(signed_block)) => { - // let extrinsics = signed_block.block.extrinsics(); - // let result = hex_string(&extrinsics.encode()); - // ChainHeadEvent::Done(ChainHeadResult { result }) - // }, - // Ok(None) => { - // // The block's body was pruned. This subscription ID has become invalid. - // debug!( - // target: LOG_TARGET, - // "[body][id={:?}] Stopping subscription because hash={:?} was pruned", - // &follow_subscription, - // hash - // ); - // subscriptions.remove_subscription(&follow_subscription); - // ChainHeadEvent::::Disjoint - // }, - // Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }), - // }; - // let _ = sink.send(&event); - // }; + let event = match self.client.block(hash) { + Ok(Some(signed_block)) => { + let extrinsics = signed_block + .block + .extrinsics() + .iter() + .map(|extrinsic| hex_string(&extrinsic.encode())) + .collect(); + FollowEvent::::OperationBodyDone(OperationBodyDone { + operation_id: block_guard.operation_id(), + value: extrinsics, + }) + }, + Ok(None) => { + // The block's body was pruned. This subscription ID has become invalid. + debug!( + target: LOG_TARGET, + "[body][id={:?}] Stopping subscription because hash={:?} was pruned", + &follow_subscription, + hash + ); + self.subscriptions.remove_subscription(&follow_subscription); + FollowEvent::::OperationError(OperationError { + operation_id: block_guard.operation_id(), + error: "Requested block was pruned".to_string(), + }) + }, + Err(error) => FollowEvent::::OperationError(OperationError { + operation_id: block_guard.operation_id(), + error: error.to_string(), + }), + }; - // self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - Ok(MethodResponse::LimitReached) + let _ = block_guard.response_sender().unbounded_send(event); + Ok(MethodResponse::Started(MethodResponseStarted { + operation_id: block_guard.operation_id(), + discarded_items: None, + })) } fn chain_head_unstable_header( From 6c5940c9e605497ceeb1a94c90648d6ee76b341e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 27 Jul 2023 15:45:47 +0300 Subject: [PATCH 06/16] chainHead: Generate `chainHead_call` response Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 57 ++++++++++--------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 8ebfb49db95fc..382aaf281e391 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -20,7 +20,7 @@ use super::{ chain_head_storage::ChainHeadStorage, - event::{MethodResponseStarted, OperationBodyDone}, + event::{MethodResponseStarted, OperationBodyDone, OperationCallDone}, subscription::BlockGuard, }; use crate::{ @@ -349,8 +349,6 @@ where ) -> RpcResult { let call_parameters = Bytes::from(parse_hex_param(call_parameters)?); - let client = self.client.clone(); - let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) => { @@ -364,31 +362,36 @@ where Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - // let fut = async move { - // // Reject subscription if with_runtime is false. - // if !block_guard.has_runtime() { - // let _ = sink.reject(ChainHeadRpcError::InvalidParam( - // "The runtime updates flag must be set".into(), - // )); - // return - // } - - // let res = client - // .executor() - // .call(hash, &function, &call_parameters, CallContext::Offchain) - // .map(|result| { - // let result = hex_string(&result); - // ChainHeadEvent::Done(ChainHeadResult { result }) - // }) - // .unwrap_or_else(|error| { - // ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }) - // }); - - // let _ = sink.send(&res); - // }; + // Reject subscription if with_runtime is false. + if !block_guard.has_runtime() { + return Err(ChainHeadRpcError::InvalidParam( + "The runtime updates flag must be set".to_string(), + ) + .into()) + } - // self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - Ok(MethodResponse::LimitReached) + let event = self + .client + .executor() + .call(hash, &function, &call_parameters, CallContext::Offchain) + .map(|result| { + FollowEvent::::OperationCallDone(OperationCallDone { + operation_id: block_guard.operation_id(), + output: hex_string(&result), + }) + }) + .unwrap_or_else(|error| { + FollowEvent::::OperationError(OperationError { + operation_id: block_guard.operation_id(), + error: error.to_string(), + }) + }); + + let _ = block_guard.response_sender().unbounded_send(event); + Ok(MethodResponse::Started(MethodResponseStarted { + operation_id: block_guard.operation_id(), + discarded_items: None, + })) } fn chain_head_unstable_unpin( From 589efa88099b3a5ea9734e0d812c8e853b7b27a0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 27 Jul 2023 16:06:22 +0300 Subject: [PATCH 07/16] chainHead: Generate `chainHead_storage` responses Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 28 +++---- .../src/chain_head/chain_head_storage.rs | 78 ++++++++++++------- 2 files changed, 61 insertions(+), 45 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 382aaf281e391..752da7959cf53 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -21,17 +21,13 @@ use super::{ chain_head_storage::ChainHeadStorage, event::{MethodResponseStarted, OperationBodyDone, OperationCallDone}, - subscription::BlockGuard, }; use crate::{ chain_head::{ api::ChainHeadApiServer, chain_head_follow::ChainHeadFollower, error::Error as ChainHeadRpcError, - event::{ - ChainHeadEvent, ChainHeadResult, ChainHeadStorageEvent, ErrorEvent, FollowEvent, - MethodResponse, OperationError, StorageQuery, StorageQueryType, - }, + event::{FollowEvent, MethodResponse, OperationError, StorageQuery, StorageQueryType}, hex_string, subscription::{SubscriptionManagement, SubscriptionManagementError}, }, @@ -49,7 +45,6 @@ use sc_client_api::{ Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey, StorageProvider, }; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_api::CallApiAt; use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}; use sp_core::{traits::CallContext, Bytes}; @@ -314,8 +309,6 @@ where .transpose()? .map(ChildInfo::new_default_from_vec); - let client = self.client.clone(); - let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) => { @@ -328,16 +321,19 @@ where }, Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - // let storage_client = ChainHeadStorage::::new(client); - // let fut = async move { - // let _block_guard = block_guard; - - // storage_client.generate_events(sink, hash, items, child_trie); - // }; + let storage_client = ChainHeadStorage::::new(self.client.clone()); + let operation_id = block_guard.operation_id(); + let fut = async move { + storage_client.generate_events(block_guard, hash, items, child_trie); + }; - // self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - Ok(MethodResponse::LimitReached) + self.executor + .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + Ok(MethodResponse::Started(MethodResponseStarted { + operation_id, + discarded_items: Some(0), + })) } fn chain_head_unstable_call( diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index df1600628ded4..393e4489c8c07 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -20,17 +20,21 @@ use std::{marker::PhantomData, sync::Arc}; -use jsonrpsee::SubscriptionSink; use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider}; +use sc_utils::mpsc::TracingUnboundedSender; use sp_api::BlockT; use sp_core::storage::well_known_keys; +use crate::chain_head::event::OperationStorageItems; + use super::{ event::{ - ChainHeadStorageEvent, ItemsEvent, StorageQuery, StorageQueryType, StorageResult, + OperationError, OperationId, StorageQuery, StorageQueryType, StorageResult, StorageResultType, }, - hex_string, ErrorEvent, + hex_string, + subscription::BlockGuard, + FollowEvent, }; /// The maximum number of items the `chainHead_storage` can return @@ -70,10 +74,10 @@ fn is_key_queryable(key: &[u8]) -> bool { } /// The result of making a query call. -type QueryResult = Result, ChainHeadStorageEvent>; +type QueryResult = Result, String>; /// The result of iterating over keys. -type QueryIterResult = Result, ChainHeadStorageEvent>; +type QueryIterResult = Result, String>; impl ChainHeadStorage where @@ -101,11 +105,7 @@ where result: StorageResultType::Value(hex_string(&storage_data.0)), })) }) - .unwrap_or_else(|err| { - QueryResult::Err(ChainHeadStorageEvent::Error(ErrorEvent { - error: err.to_string(), - })) - }) + .unwrap_or_else(|error| QueryResult::Err(error.to_string())) } /// Fetch the hash of a value from storage. @@ -128,11 +128,7 @@ where result: StorageResultType::Hash(hex_string(&storage_data.as_ref())), })) }) - .unwrap_or_else(|err| { - QueryResult::Err(ChainHeadStorageEvent::Error(ErrorEvent { - error: err.to_string(), - })) - }) + .unwrap_or_else(|error| QueryResult::Err(error.to_string())) } /// Handle iterating over (key, value) or (key, hash) pairs. @@ -148,7 +144,7 @@ where } else { self.client.storage_keys(hash, Some(key), None) } - .map_err(|err| ChainHeadStorageEvent::Error(ErrorEvent { error: err.to_string() }))?; + .map_err(|error| error.to_string())?; let mut ret = Vec::with_capacity(MAX_ITER_ITEMS); let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS); @@ -169,14 +165,31 @@ where /// Generate the block events for the `chainHead_storage` method. pub fn generate_events( &self, - mut sink: SubscriptionSink, + block_guard: BlockGuard, hash: Block::Hash, items: Vec>, child_key: Option, ) { + /// Build and send the opaque error back to the `chainHead_follow` method. + fn send_error( + sender: &TracingUnboundedSender>, + operation_id: String, + error: String, + ) { + let _ = + sender.unbounded_send(FollowEvent::::OperationError(OperationError { + operation_id, + error, + })); + } + + let sender = block_guard.response_sender(); + if let Some(child_key) = child_key.as_ref() { if !is_key_queryable(child_key.storage_key()) { - let _ = sink.send(&ChainHeadStorageEvent::Done); + let _ = sender.unbounded_send(FollowEvent::::OperationStorageDone( + OperationId { operation_id: block_guard.operation_id() }, + )); return } } @@ -192,8 +205,8 @@ where match self.query_storage_value(hash, &item.key, child_key.as_ref()) { Ok(Some(value)) => storage_results.push(value), Ok(None) => continue, - Err(err) => { - let _ = sink.send(&err); + Err(error) => { + send_error::(&sender, block_guard.operation_id(), error); return }, } @@ -202,8 +215,8 @@ where match self.query_storage_hash(hash, &item.key, child_key.as_ref()) { Ok(Some(value)) => storage_results.push(value), Ok(None) => continue, - Err(err) => { - let _ = sink.send(&err); + Err(error) => { + send_error::(&sender, block_guard.operation_id(), error); return }, }, @@ -214,8 +227,8 @@ where IterQueryType::Value, ) { Ok(values) => storage_results.extend(values), - Err(err) => { - let _ = sink.send(&err); + Err(error) => { + send_error::(&sender, block_guard.operation_id(), error); return }, }, @@ -226,8 +239,8 @@ where IterQueryType::Hash, ) { Ok(values) => storage_results.extend(values), - Err(err) => { - let _ = sink.send(&err); + Err(error) => { + send_error::(&sender, block_guard.operation_id(), error); return }, }, @@ -236,10 +249,17 @@ where } if !storage_results.is_empty() { - let event = ChainHeadStorageEvent::Items(ItemsEvent { items: storage_results }); - let _ = sink.send(&event); + let _ = sender.unbounded_send(FollowEvent::::OperationStorageItems( + OperationStorageItems { + operation_id: block_guard.operation_id(), + items: storage_results, + }, + )); } - let _ = sink.send(&ChainHeadStorageEvent::Done); + let _ = + sender.unbounded_send(FollowEvent::::OperationStorageDone(OperationId { + operation_id: block_guard.operation_id(), + })); } } From 8062837e1458e96263beb6c3d5531b50fd547fbe Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 27 Jul 2023 19:50:48 +0300 Subject: [PATCH 08/16] chainHead: Propagate responses of methods to chainHead_follow Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/chain_head_follow.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index 7cc7c586cab20..0fa995ce73a09 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -80,6 +80,8 @@ enum NotificationType { NewBlock(BlockImportNotification), /// The finalized block notification obtained from `finality_notification_stream`. Finalized(FinalityNotification), + /// The response of `chainHead` method calls. + MethodResponse(FollowEvent), } /// The initial blocks that should be reported or ignored by the chainHead. @@ -515,6 +517,7 @@ where self.handle_import_blocks(notification, &startup_point), NotificationType::Finalized(notification) => self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point), + NotificationType::MethodResponse(notification) => Ok(vec![notification]), }; let events = match events { @@ -585,6 +588,10 @@ where .finality_notification_stream() .map(|notification| NotificationType::Finalized(notification)); + let stream_responses = sub_data + .response_receiver + .map(|response| NotificationType::MethodResponse(response)); + let startup_point = StartupPoint::from(self.client.info()); let (initial_events, pruned_forks) = match self.generate_init_events(&startup_point) { Ok(blocks) => blocks, @@ -602,6 +609,7 @@ where let initial = NotificationType::InitialEvents(initial_events); let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized); + let merged = tokio_stream::StreamExt::merge(merged, stream_responses); let stream = stream::once(futures::future::ready(initial)).chain(merged); self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop) From 4f7b4451dbb69b9c84fc8eb1a084b072b82a900b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 31 Jul 2023 15:20:41 +0300 Subject: [PATCH 09/16] chainHead/tests: Adjust `chainHead_body` responses Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 54 ++++++++++++++-------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index fc2f1e85b42a3..808504410607e 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -1,5 +1,7 @@ use crate::chain_head::{ - event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType, StorageResultType}, + event::{ + ChainHeadStorageEvent, MethodResponse, StorageQuery, StorageQueryType, StorageResultType, + }, test_utils::ChainHeadMockClient, }; @@ -330,29 +332,34 @@ async fn get_body() { let block_hash = format!("{:?}", block.header.hash()); let invalid_hash = hex_string(&INVALID_HASH); - // Subscription ID is stale the disjoint event is emitted. - let mut sub = api - .subscribe("chainHead_unstable_body", ["invalid_sub_id", &invalid_hash]) + // Subscription ID is invalid. + let response: MethodResponse = api + .call("chainHead_unstable_body", ["invalid_sub_id", &invalid_hash]) .await .unwrap(); - let event: ChainHeadEvent = get_next_event(&mut sub).await; - assert_eq!(event, ChainHeadEvent::::Disjoint); + assert_matches!(response, MethodResponse::LimitReached); - // Valid subscription ID with invalid block hash will error. + // Block hash is invalid. let err = api - .subscribe("chainHead_unstable_body", [&sub_id, &invalid_hash]) + .call::<_, serde_json::Value>("chainHead_unstable_body", [&sub_id, &invalid_hash]) .await .unwrap_err(); assert_matches!(err, Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash" ); - // Obtain valid the body (list of extrinsics). - let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); - let event: ChainHeadEvent = get_next_event(&mut sub).await; - // Block contains no extrinsics. - assert_matches!(event, - ChainHeadEvent::Done(done) if done.result == "0x00" + // Valid call. + let response: MethodResponse = + api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + // Response propagated to `chainHead_follow`. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value.is_empty() ); // Import a block with extrinsics. @@ -378,12 +385,19 @@ async fn get_body() { FollowEvent::BestBlockChanged(_) ); - let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); - let event: ChainHeadEvent = get_next_event(&mut sub).await; - // Hex encoded scale encoded string for the vector of extrinsics. - let expected = hex_string(&block.extrinsics.encode()); - assert_matches!(event, - ChainHeadEvent::Done(done) if done.result == expected + // Valid call to a block with extrinsics. + let response: MethodResponse = + api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + // Response propagated to `chainHead_follow`. + let expected_tx = hex_string(&block.extrinsics[0].encode()); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value == vec![expected_tx] ); } From 53fcd99e3f96c4606e14ccd043e73464855e0370 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 31 Jul 2023 15:31:12 +0300 Subject: [PATCH 10/16] chainHead/tests: Adjust `chainHead_call` responses Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 51 +++++++++++++--------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 808504410607e..79041b3af4e0c 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -403,24 +403,23 @@ async fn get_body() { #[tokio::test] async fn call_runtime() { - let (_client, api, _sub, sub_id, block) = setup_api().await; + let (_client, api, mut block_sub, sub_id, block) = setup_api().await; let block_hash = format!("{:?}", block.header.hash()); let invalid_hash = hex_string(&INVALID_HASH); - // Subscription ID is stale the disjoint event is emitted. - let mut sub = api - .subscribe( + // Subscription ID is invalid. + let response: MethodResponse = api + .call( "chainHead_unstable_call", ["invalid_sub_id", &block_hash, "BabeApi_current_epoch", "0x00"], ) .await .unwrap(); - let event: ChainHeadEvent = get_next_event(&mut sub).await; - assert_eq!(event, ChainHeadEvent::::Disjoint); + assert_matches!(response, MethodResponse::LimitReached); - // Valid subscription ID with invalid block hash will error. + // Block hash is invalid. let err = api - .subscribe( + .call::<_, serde_json::Value>( "chainHead_unstable_call", [&sub_id, &invalid_hash, "BabeApi_current_epoch", "0x00"], ) @@ -432,8 +431,9 @@ async fn call_runtime() { // Pass an invalid parameters that cannot be decode. let err = api - .subscribe( + .call::<_, serde_json::Value>( "chainHead_unstable_call", + // 0x0 is invalid. [&sub_id, &block_hash, "BabeApi_current_epoch", "0x0"], ) .await @@ -442,34 +442,43 @@ async fn call_runtime() { Error::Call(CallError::Custom(ref err)) if err.code() == 2003 && err.message().contains("Invalid parameter") ); + // Valid call. let alice_id = AccountKeyring::Alice.to_account_id(); // Hex encoded scale encoded bytes representing the call parameters. let call_parameters = hex_string(&alice_id.encode()); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_call", [&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters], ) .await .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // Response propagated to `chainHead_follow`. assert_matches!( - get_next_event::>(&mut sub).await, - ChainHeadEvent::Done(done) if done.result == "0x0000000000000000" + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000" ); // The `current_epoch` takes no parameters and not draining the input buffer // will cause the execution to fail. - let mut sub = api - .subscribe( - "chainHead_unstable_call", - [&sub_id, &block_hash, "BabeApi_current_epoch", "0x00"], - ) + let response: MethodResponse = api + .call("chainHead_unstable_call", [&sub_id, &block_hash, "BabeApi_current_epoch", "0x00"]) .await .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + // Error propagated to `chainHead_follow`. assert_matches!( - get_next_event::>(&mut sub).await, - ChainHeadEvent::Error(event) if event.error.contains("Execution failed") + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationError(error) if error.operation_id == operation_id && error.error.contains("Execution failed") ); } @@ -515,7 +524,7 @@ async fn call_runtime_without_flag() { let alice_id = AccountKeyring::Alice.to_account_id(); let call_parameters = hex_string(&alice_id.encode()); let err = api - .subscribe( + .call::<_, serde_json::Value>( "chainHead_unstable_call", [&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters], ) From 167dd72da3cda015a8d8bf1c7e2fb7021991b39b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 31 Jul 2023 18:36:58 +0300 Subject: [PATCH 11/16] chainHead/tests: Adjust `chainHead_call` responses Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 297 ++++++++++++++------- 1 file changed, 201 insertions(+), 96 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 79041b3af4e0c..79b12b72ff10c 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -1,7 +1,5 @@ use crate::chain_head::{ - event::{ - ChainHeadStorageEvent, MethodResponse, StorageQuery, StorageQueryType, StorageResultType, - }, + event::{MethodResponse, StorageQuery, StorageQueryType, StorageResultType}, test_utils::ChainHeadMockClient, }; @@ -543,9 +541,9 @@ async fn get_storage_hash() { let invalid_hash = hex_string(&INVALID_HASH); let key = hex_string(&KEY); - // Subscription ID is stale the disjoint event is emitted. - let mut sub = api - .subscribe( + // Subscription ID is invalid. + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ "invalid_sub_id", @@ -555,12 +553,11 @@ async fn get_storage_hash() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_eq!(event, ChainHeadStorageEvent::Disjoint); + assert_matches!(response, MethodResponse::LimitReached); - // Valid subscription ID with invalid block hash will error. + // Block hash is invalid. let err = api - .subscribe( + .call::<_, serde_json::Value>( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -575,8 +572,8 @@ async fn get_storage_hash() { ); // Valid call without storage at the key. - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -586,9 +583,15 @@ async fn get_storage_hash() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; // The `Done` event is generated directly since the key does not have any value associated. - assert_matches!(event, ChainHeadStorageEvent::Done); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Import a new block with storage changes. let mut builder = client.new_block(Default::default()).unwrap(); @@ -608,9 +611,8 @@ async fn get_storage_hash() { ); // Valid call with storage at the key. - let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -620,17 +622,30 @@ async fn get_storage_hash() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Child value set in `setup_api`. let child_info = hex_string(&CHILD_STORAGE_KEY); let genesis_hash = format!("{:?}", client.genesis_hash()); - let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); - let mut sub = api - .subscribe( + + // Valid call with storage at the key. + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -641,10 +656,22 @@ async fn get_storage_hash() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); } #[tokio::test] @@ -670,10 +697,8 @@ async fn get_storage_multi_query_iter() { ); // Valid call with storage at the key. - let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); - let expected_value = hex_string(&VALUE); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -692,22 +717,34 @@ async fn get_storage_multi_query_iter() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 2 && - res.items[0].key == key && - res.items[1].key == key && - res.items[0].result == StorageResultType::Hash(expected_hash) && - res.items[1].result == StorageResultType::Value(expected_value)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); + let expected_value = hex_string(&VALUE); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 2 && + res.items[0].key == key && + res.items[1].key == key && + res.items[0].result == StorageResultType::Hash(expected_hash) && + res.items[1].result == StorageResultType::Value(expected_value) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Child value set in `setup_api`. let child_info = hex_string(&CHILD_STORAGE_KEY); let genesis_hash = format!("{:?}", client.genesis_hash()); let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); let expected_value = hex_string(&CHILD_VALUE); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -727,14 +764,24 @@ async fn get_storage_multi_query_iter() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 2 && - res.items[0].key == key && - res.items[1].key == key && - res.items[0].result == StorageResultType::Hash(expected_hash) && - res.items[1].result == StorageResultType::Value(expected_value)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 2 && + res.items[0].key == key && + res.items[1].key == key && + res.items[0].result == StorageResultType::Hash(expected_hash) && + res.items[1].result == StorageResultType::Value(expected_value) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); } #[tokio::test] @@ -744,9 +791,9 @@ async fn get_storage_value() { let invalid_hash = hex_string(&INVALID_HASH); let key = hex_string(&KEY); - // Subscription ID is stale the disjoint event is emitted. - let mut sub = api - .subscribe( + // Subscription ID is invalid. + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ "invalid_sub_id", @@ -756,12 +803,11 @@ async fn get_storage_value() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_eq!(event, ChainHeadStorageEvent::Disjoint); + assert_matches!(response, MethodResponse::LimitReached); - // Valid subscription ID with invalid block hash will error. + // Block hash is invalid. let err = api - .subscribe( + .call::<_, serde_json::Value>( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -776,8 +822,8 @@ async fn get_storage_value() { ); // Valid call without storage at the key. - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -787,9 +833,15 @@ async fn get_storage_value() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; // The `Done` event is generated directly since the key does not have any value associated. - assert_matches!(event, ChainHeadStorageEvent::Done); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Import a new block with storage changes. let mut builder = client.new_block(Default::default()).unwrap(); @@ -809,9 +861,8 @@ async fn get_storage_value() { ); // Valid call with storage at the key. - let expected_value = hex_string(&VALUE); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -821,17 +872,29 @@ async fn get_storage_value() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let expected_value = hex_string(&VALUE); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Child value set in `setup_api`. - let child_info = hex_string(b"child"); + let child_info = hex_string(&CHILD_STORAGE_KEY); let genesis_hash = format!("{:?}", client.genesis_hash()); - let expected_value = hex_string(&CHILD_VALUE); - let mut sub = api - .subscribe( + + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -842,15 +905,28 @@ async fn get_storage_value() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value)); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let expected_value = hex_string(&CHILD_VALUE); + + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); } #[tokio::test] -async fn get_storage_wrong_key() { - let (mut _client, api, mut _block_sub, sub_id, block) = setup_api().await; +async fn get_storage_non_queryable_key() { + let (mut _client, api, mut block_sub, sub_id, block) = setup_api().await; let block_hash = format!("{:?}", block.header.hash()); let key = hex_string(&KEY); @@ -858,8 +934,9 @@ async fn get_storage_wrong_key() { let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec(); prefixed_key.extend_from_slice(&KEY); let prefixed_key = hex_string(&prefixed_key); - let mut sub = api - .subscribe( + + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -869,15 +946,22 @@ async fn get_storage_wrong_key() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The `Done` event is generated directly since the key is not queryable. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX. let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec(); prefixed_key.extend_from_slice(&KEY); let prefixed_key = hex_string(&prefixed_key); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -887,15 +971,22 @@ async fn get_storage_wrong_key() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The `Done` event is generated directly since the key is not queryable. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Child key is prefixed by CHILD_STORAGE_KEY_PREFIX. let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec(); - prefixed_key.extend_from_slice(b"child"); + prefixed_key.extend_from_slice(CHILD_STORAGE_KEY); let prefixed_key = hex_string(&prefixed_key); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -906,15 +997,22 @@ async fn get_storage_wrong_key() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The `Done` event is generated directly since the key is not queryable. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); // Child key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX. let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec(); - prefixed_key.extend_from_slice(b"child"); + prefixed_key.extend_from_slice(CHILD_STORAGE_KEY); let prefixed_key = hex_string(&prefixed_key); - let mut sub = api - .subscribe( + let response: MethodResponse = api + .call( "chainHead_unstable_storage", rpc_params![ &sub_id, @@ -925,8 +1023,15 @@ async fn get_storage_wrong_key() { ) .await .unwrap(); - let event: ChainHeadStorageEvent = get_next_event(&mut sub).await; - assert_matches!(event, ChainHeadStorageEvent::Done); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The `Done` event is generated directly since the key is not queryable. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); } #[tokio::test] From 35aca6d79ed4f221b46517c745eef756623d1b16 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 31 Jul 2023 18:45:39 +0300 Subject: [PATCH 12/16] chainHead/tests: Ensure unique operation IDs across methods Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 74 +++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 79b12b72ff10c..cf270f4d48ab8 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -25,7 +25,7 @@ use sp_core::{ Blake2Hasher, Hasher, }; use sp_version::RuntimeVersion; -use std::{sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use substrate_test_runtime::Transfer; use substrate_test_runtime_client::{ prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client, @@ -1034,6 +1034,78 @@ async fn get_storage_non_queryable_key() { ); } +#[tokio::test] +async fn unique_operation_ids() { + let (mut _client, api, mut block_sub, sub_id, block) = setup_api().await; + let block_hash = format!("{:?}", block.header.hash()); + + let mut op_ids = HashSet::new(); + + // Ensure that operation IDs are unique for multiple method calls. + for _ in 0..5 { + // Valid `chainHead_unstable_body` call. + let response: MethodResponse = + api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value.is_empty() + ); + // Ensure uniqueness. + assert!(op_ids.insert(operation_id)); + + // Valid `chainHead_unstable_storage` call. + let key = hex_string(&KEY); + let response: MethodResponse = api + .call( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &block_hash, + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }] + ], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The `Done` event is generated directly since the key does not have any value associated. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); + // Ensure uniqueness. + assert!(op_ids.insert(operation_id)); + + // Valid `chainHead_unstable_call` call. + let alice_id = AccountKeyring::Alice.to_account_id(); + let call_parameters = hex_string(&alice_id.encode()); + let response: MethodResponse = api + .call( + "chainHead_unstable_call", + [&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // Response propagated to `chainHead_follow`. + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000" + ); + // Ensure uniqueness. + assert!(op_ids.insert(operation_id)); + } +} + #[tokio::test] async fn follow_generates_initial_blocks() { let builder = TestClientBuilder::new(); From 008fa222006e550c4401ffe57f874d73d6b3ec9f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 31 Jul 2023 18:52:34 +0300 Subject: [PATCH 13/16] chainHead/events: Remove old method events Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/event.rs | 168 --------------------- client/rpc-spec-v2/src/chain_head/mod.rs | 4 +- 2 files changed, 2 insertions(+), 170 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/event.rs b/client/rpc-spec-v2/src/chain_head/event.rs index 971a0a9f46b95..65bc8b247c880 100644 --- a/client/rpc-spec-v2/src/chain_head/event.rs +++ b/client/rpc-spec-v2/src/chain_head/event.rs @@ -276,31 +276,6 @@ pub enum FollowEvent { Stop, } -/// The result of a chain head method. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ChainHeadResult { - /// Result of the method. - pub result: T, -} - -/// The event generated by the body / call / storage methods. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -#[serde(tag = "event")] -pub enum ChainHeadEvent { - /// The request completed successfully. - Done(ChainHeadResult), - /// The resources requested are inaccessible. - /// - /// Resubmitting the request later might succeed. - Inaccessible(ErrorEvent), - /// An error occurred. This is definitive. - Error(ErrorEvent), - /// The provided subscription ID is stale or invalid. - Disjoint, -} - /// The storage item received as paramter. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -351,35 +326,6 @@ pub enum StorageResultType { ClosestDescendantMerkleValue(String), } -/// The event generated by storage method. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -#[serde(tag = "event")] -pub enum ChainHeadStorageEvent { - /// The request produced multiple result items. - Items(ItemsEvent), - /// The request produced multiple result items. - WaitForContinue, - /// The request completed successfully and all the results were provided. - Done, - /// The resources requested are inaccessible. - /// - /// Resubmitting the request later might succeed. - Inaccessible, - /// An error occurred. This is definitive. - Error(ErrorEvent), - /// The provided subscription ID is stale or invalid. - Disjoint, -} - -/// The request produced multiple result items. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ItemsEvent { - /// The resulting items. - pub items: Vec, -} - /// The method respose of `chainHead_body`, `chainHead_call` and `chainHead_storage`. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -714,56 +660,6 @@ mod tests { assert_eq!(event_dec, event); } - #[test] - fn chain_head_done_event() { - let event: ChainHeadEvent = - ChainHeadEvent::Done(ChainHeadResult { result: "A".into() }); - - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"done","result":"A"}"#; - assert_eq!(ser, exp); - - let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(event_dec, event); - } - - #[test] - fn chain_head_inaccessible_event() { - let event: ChainHeadEvent = - ChainHeadEvent::Inaccessible(ErrorEvent { error: "A".into() }); - - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"inaccessible","error":"A"}"#; - assert_eq!(ser, exp); - - let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(event_dec, event); - } - - #[test] - fn chain_head_error_event() { - let event: ChainHeadEvent = ChainHeadEvent::Error(ErrorEvent { error: "A".into() }); - - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"error","error":"A"}"#; - assert_eq!(ser, exp); - - let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(event_dec, event); - } - - #[test] - fn chain_head_disjoint_event() { - let event: ChainHeadEvent = ChainHeadEvent::Disjoint; - - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"disjoint"}"#; - assert_eq!(ser, exp); - - let event_dec: ChainHeadEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(event_dec, event); - } - #[test] fn chain_head_storage_query() { // Item with Value. @@ -855,68 +751,4 @@ mod tests { let dec: StorageResult = serde_json::from_str(exp).unwrap(); assert_eq!(dec, item); } - - #[test] - fn chain_head_storage_event() { - // Event with Items. - let event = ChainHeadStorageEvent::Items(ItemsEvent { - items: vec![ - StorageResult { - key: "0x1".into(), - result: StorageResultType::Value("first".into()), - }, - StorageResult { - key: "0x2".into(), - result: StorageResultType::Hash("second".into()), - }, - ], - }); - // Encode - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"items","items":[{"key":"0x1","value":"first"},{"key":"0x2","hash":"second"}]}"#; - assert_eq!(ser, exp); - // Decode - let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(dec, event); - - // Event with WaitForContinue. - let event = ChainHeadStorageEvent::WaitForContinue; - // Encode - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"wait-for-continue"}"#; - assert_eq!(ser, exp); - // Decode - let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(dec, event); - - // Event with Done. - let event = ChainHeadStorageEvent::Done; - // Encode - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"done"}"#; - assert_eq!(ser, exp); - // Decode - let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(dec, event); - - // Event with Inaccessible. - let event = ChainHeadStorageEvent::Inaccessible; - // Encode - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"inaccessible"}"#; - assert_eq!(ser, exp); - // Decode - let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(dec, event); - - // Event with Inaccessible. - let event = ChainHeadStorageEvent::Error(ErrorEvent { error: "reason".into() }); - // Encode - let ser = serde_json::to_string(&event).unwrap(); - let exp = r#"{"event":"error","error":"reason"}"#; - assert_eq!(ser, exp); - // Decode - let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap(); - assert_eq!(dec, event); - } } diff --git a/client/rpc-spec-v2/src/chain_head/mod.rs b/client/rpc-spec-v2/src/chain_head/mod.rs index 604f565ce7585..f0fa898f9f7e1 100644 --- a/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/client/rpc-spec-v2/src/chain_head/mod.rs @@ -39,8 +39,8 @@ mod subscription; pub use api::ChainHeadApiServer; pub use chain_head::ChainHead; pub use event::{ - BestBlockChanged, ChainHeadEvent, ChainHeadResult, ErrorEvent, Finalized, FollowEvent, - Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent, + BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent, + RuntimeVersionEvent, }; use sp_core::hexdisplay::{AsBytesRef, HexDisplay}; From 1ace528bb837faa7d681241acf333149c51005eb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 8 Aug 2023 15:27:40 +0300 Subject: [PATCH 14/16] chainHead: Return `InvalidBlock` error if pinning fails Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/chain_head.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 752da7959cf53..16881b05fd7b9 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -232,10 +232,7 @@ where hash ); self.subscriptions.remove_subscription(&follow_subscription); - FollowEvent::::OperationError(OperationError { - operation_id: block_guard.operation_id(), - error: "Requested block was pruned".to_string(), - }) + return Err(ChainHeadRpcError::InvalidBlock.into()) }, Err(error) => FollowEvent::::OperationError(OperationError { operation_id: block_guard.operation_id(), From e189a73dda95a6f6db4601460557cf5754654ee0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 8 Aug 2023 19:33:31 +0300 Subject: [PATCH 15/16] chainHead: Wrap subscription IDs Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/subscription/inner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 142dfe4dd9cf1..c0c2701c5e145 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -241,7 +241,7 @@ impl SubscriptionState { /// Generate the next operation ID for this subscription. fn next_operation_id(&mut self) -> usize { let op_id = self.next_operation_id; - self.next_operation_id += 1; + self.next_operation_id = self.next_operation_id.wrapping_add(1); op_id } } From 83afea8137537d95b669a487e92f430011a3ce7e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 8 Aug 2023 19:51:34 +0300 Subject: [PATCH 16/16] chainHead/tests: Ensure separate operation IDs across subscriptions Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 77 ++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index cf270f4d48ab8..6c3c343a10b53 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -1106,6 +1106,83 @@ async fn unique_operation_ids() { } } +#[tokio::test] +async fn separate_operation_ids_for_subscriptions() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + Duration::from_secs(MAX_PINNED_SECS), + ) + .into_rpc(); + + // Create two separate subscriptions. + let mut sub_first = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id_first = sub_first.subscription_id(); + let sub_id_first = serde_json::to_string(&sub_id_first).unwrap(); + + let mut sub_second = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id_second = sub_second.subscription_id(); + let sub_id_second = serde_json::to_string(&sub_id_second).unwrap(); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + let block_hash = format!("{:?}", block.header.hash()); + + // Ensure the imported block is propagated and pinned. + assert_matches!( + get_next_event::>(&mut sub_first).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub_first).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub_first).await, + FollowEvent::BestBlockChanged(_) + ); + + assert_matches!( + get_next_event::>(&mut sub_second).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub_second).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub_second).await, + FollowEvent::BestBlockChanged(_) + ); + + // Each `chainHead_follow` subscription receives a separate operation ID. + let response: MethodResponse = + api.call("chainHead_unstable_body", [&sub_id_first, &block_hash]).await.unwrap(); + let operation_id: String = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + assert_eq!(operation_id, "0"); + + let response: MethodResponse = api + .call("chainHead_unstable_body", [&sub_id_second, &block_hash]) + .await + .unwrap(); + let operation_id_second: String = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // The second subscription does not increment the operation ID of the first one. + assert_eq!(operation_id_second, "0"); +} + #[tokio::test] async fn follow_generates_initial_blocks() { let builder = TestClientBuilder::new();