Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

chainHead: Limit ongoing operations #14699

Merged
merged 28 commits into from
Aug 15, 2023
Merged
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4787dba
chainHead/api: Make storage/body/call pure RPC methods
lexnv Jul 27, 2023
8050811
chainHead: Add mpsc channel between RPC methods
lexnv Jul 27, 2023
3b30eb3
chainHead/subscriptions: Extract mpsc::Sender via BlockGuard
lexnv Jul 27, 2023
d5305a9
chainHead/subscriptions: Generate and provide the method operation ID
lexnv Jul 27, 2023
50c840d
chainHead: Generate `chainHead_body` response
lexnv Jul 27, 2023
6c5940c
chainHead: Generate `chainHead_call` response
lexnv Jul 27, 2023
589efa8
chainHead: Generate `chainHead_storage` responses
lexnv Jul 27, 2023
8062837
chainHead: Propagate responses of methods to chainHead_follow
lexnv Jul 27, 2023
4f7b445
chainHead/tests: Adjust `chainHead_body` responses
lexnv Jul 31, 2023
53fcd99
chainHead/tests: Adjust `chainHead_call` responses
lexnv Jul 31, 2023
167dd72
chainHead/tests: Adjust `chainHead_call` responses
lexnv Jul 31, 2023
35aca6d
chainHead/tests: Ensure unique operation IDs across methods
lexnv Jul 31, 2023
008fa22
chainHead/events: Remove old method events
lexnv Jul 31, 2023
6c30aef
chainHead/subscriptions: Add limit helper
lexnv Aug 1, 2023
a29d677
chainHead/subscription: Expose limits to `BlockGuard`
lexnv Aug 1, 2023
a33d80c
chainHead/tests: Adjust testing to ongoing operations
lexnv Aug 1, 2023
40a73fb
chainHead: Make limits configurable via `ChainHeadConfig`
lexnv Aug 1, 2023
eb15293
chainHead/tests: Adjust testing to `ChainHeadConfig`
lexnv Aug 1, 2023
a026fe0
chainHead/tests: Ensure operation limits discards items
lexnv Aug 1, 2023
4e4cca8
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_limits
lexnv Aug 9, 2023
defc196
chainHead: Improve documentation
lexnv Aug 9, 2023
4cacbc1
chainHead: Rename `OngoingOperations` -> `LimitOperations`
lexnv Aug 9, 2023
09c743c
chainHead: Rename reserve -> reserve_at_most
lexnv Aug 9, 2023
06fec3e
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_limits
Aug 10, 2023
ab481e8
chainHead: Use duration const instead of u64
lexnv Aug 14, 2023
09e7372
chainHead/subscription: Use tokio::sync::Semaphore for limits
lexnv Aug 14, 2023
8954c33
Merge remote-tracking branch 'origin/lexnv/chainhead_limits' into lex…
lexnv Aug 14, 2023
a3df109
Update client/rpc-spec-v2/src/chain_head/subscription/inner.rs
lexnv Aug 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ hex = "0.4"
futures = "0.3.21"
parking_lot = "0.12.1"
tokio-stream = { version = "0.1", features = ["sync"] }
tokio = { version = "1.22.0", features = ["sync"] }
array-bytes = "6.1"
log = "0.4.17"
futures-util = { version = "0.3.19", default-features = false }
96 changes: 67 additions & 29 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
@@ -53,6 +53,41 @@ use std::{marker::PhantomData, sync::Arc, time::Duration};

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";

/// The configuration of [`ChainHead`].
pub struct ChainHeadConfig {
/// The maximum number of pinned blocks across all subscriptions.
pub global_max_pinned_blocks: usize,
/// The maximum duration that a block is allowed to be pinned per subscription.
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
}

/// Maximum pinned blocks across all connections.
/// This number is large enough to consider immediate blocks.
/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
const MAX_PINNED_BLOCKS: usize = 512;

/// Any block of any subscription should not be pinned more than
/// this constant. When a subscription contains a block older than this,
/// the subscription becomes subject to termination.
/// Note: This should be enough for immediate blocks.
const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);

/// The maximum number of ongoing operations per subscription.
/// Note: The lower limit imposed by the spec is 16.
const MAX_ONGOING_OPERATIONS: usize = 16;

impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
}
}
}

/// An API for chain head RPC calls.
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
@@ -76,17 +111,17 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
genesis_hash: GenesisHash,
max_pinned_blocks: usize,
max_pinned_duration: Duration,
config: ChainHeadConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend: backend.clone(),
executor,
subscriptions: Arc::new(SubscriptionManagement::new(
max_pinned_blocks,
max_pinned_duration,
config.global_max_pinned_blocks,
config.subscription_max_pinned_duration,
config.subscription_max_ongoing_operations,
backend,
)),
genesis_hash,
@@ -197,12 +232,10 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<MethodResponse> {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
@@ -252,12 +285,10 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<Option<String>> {
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(None)
},
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
@@ -306,21 +337,27 @@ where
.transpose()?
.map(ChildInfo::new_default_from_vec);

let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
let block_guard =
match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
let operation_id = block_guard.operation_id();

// The number of operations we are allowed to execute.
let num_operations = block_guard.num_reserved();
let discarded = items.len().saturating_sub(num_operations);
let mut items = items;
items.truncate(num_operations);

let fut = async move {
storage_client.generate_events(block_guard, hash, items, child_trie);
};
@@ -329,7 +366,7 @@ where
.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id,
discarded_items: Some(0),
discarded_items: Some(discarded),
}))
}

@@ -342,9 +379,10 @@ where
) -> RpcResult<MethodResponse> {
let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);

let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
2 changes: 1 addition & 1 deletion client/rpc-spec-v2/src/chain_head/mod.rs
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ mod chain_head_storage;
mod subscription;

pub use api::ChainHeadApiServer;
pub use chain_head::ChainHead;
pub use chain_head::{ChainHead, ChainHeadConfig};
pub use event::{
BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
8 changes: 4 additions & 4 deletions client/rpc-spec-v2/src/chain_head/subscription/error.rs
Original file line number Diff line number Diff line change
@@ -21,10 +21,10 @@ use sp_blockchain::Error;
/// Subscription management error.
#[derive(Debug, thiserror::Error)]
pub enum SubscriptionManagementError {
/// The block cannot be pinned into memory because
/// the subscription has exceeded the maximum number
/// of blocks pinned.
#[error("Exceeded pinning limits")]
/// The subscription has exceeded the internal limits
/// regarding the number of pinned blocks in memory or
/// the number of ongoing operations.
#[error("Exceeded pinning or operation limits")]
ExceededLimits,
/// Error originated from the blockchain (client or backend).
#[error("Blockchain error {0}")]
157 changes: 141 additions & 16 deletions client/rpc-spec-v2/src/chain_head/subscription/inner.rs
Original file line number Diff line number Diff line change
@@ -107,6 +107,62 @@ impl BlockStateMachine {
}
}

/// Limit the number of ongoing operations across methods.
struct LimitOperations {
/// Limit the number of ongoing operations for this subscription.
semaphore: Arc<tokio::sync::Semaphore>,
}

impl LimitOperations {
/// Constructs a new [`LimitOperations`].
fn new(max_operations: usize) -> Self {
LimitOperations { semaphore: Arc::new(tokio::sync::Semaphore::new(max_operations)) }
}

/// Reserves capacity to execute at least one operation and at most the requested items.
///
/// Dropping [`PermitOperations`] without executing an operation will release
/// the reserved capacity.
///
/// Returns nothing if there's no space available, else returns a permit
/// that guarantees that at least one operation can be executed.
fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
let num_ops = std::cmp::min(self.semaphore.available_permits(), to_reserve);

if num_ops == 0 {
return None
}

let permits = Arc::clone(&self.semaphore)
.try_acquire_many_owned(num_ops.try_into().ok()?)
.ok()?;

Some(PermitOperations { num_ops, _permit: permits })
}
}

/// Permits a number of operations to be executed.
///
/// [`PermitOperations`] are returned by [`LimitOperations::reserve()`] and are used
/// to guarantee the RPC server can execute the number of operations.
///
/// The number of reserved items are given back to the [`LimitOperations`] on drop.
struct PermitOperations {
/// The number of operations permitted (reserved).
num_ops: usize,
/// The permit for these operations.
_permit: tokio::sync::OwnedSemaphorePermit,
}

impl PermitOperations {
/// Returns the number of reserved elements for this permit.
///
/// This can be smaller than the number of items requested via [`LimitOperations::reserve()`].
fn num_reserved(&self) -> usize {
self.num_ops
}
}

struct BlockState {
/// The state machine of this block.
state_machine: BlockStateMachine,
@@ -124,6 +180,8 @@ struct SubscriptionState<Block: BlockT> {
///
/// This object is cloned between methods.
response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
/// Limit the number of ongoing operations.
limits: LimitOperations,
/// The next operation ID.
next_operation_id: usize,
/// Track the block hashes available for this subscription.
@@ -244,6 +302,13 @@ impl<Block: BlockT> SubscriptionState<Block> {
self.next_operation_id = self.next_operation_id.wrapping_add(1);
op_id
}

/// Reserves capacity to execute at least one operation and at most the requested items.
///
/// For more details see [`PermitOperations`].
fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
self.limits.reserve_at_most(to_reserve)
}
}

/// Keeps a specific block pinned while the handle is alive.
@@ -254,6 +319,7 @@ pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
with_runtime: bool,
response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
operation_id: String,
permit_operations: PermitOperations,
backend: Arc<BE>,
}

@@ -272,6 +338,7 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
with_runtime: bool,
response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
operation_id: usize,
permit_operations: PermitOperations,
backend: Arc<BE>,
) -> Result<Self, SubscriptionManagementError> {
backend
@@ -283,6 +350,7 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
with_runtime,
response_sender,
operation_id: operation_id.to_string(),
permit_operations,
backend,
})
}
@@ -301,6 +369,13 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
pub fn operation_id(&self) -> String {
self.operation_id.clone()
}

/// Returns the number of reserved elements for this permit.
///
/// This can be smaller than the number of items requested.
pub fn num_reserved(&self) -> usize {
self.permit_operations.num_reserved()
}
}

impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
@@ -328,6 +403,8 @@ pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
global_max_pinned_blocks: usize,
/// The maximum duration that a block is allowed to be pinned per subscription.
local_max_pin_duration: Duration,
/// The maximum number of ongoing operations per subscription.
max_ongoing_operations: usize,
/// Map the subscription ID to internal details of the subscription.
subs: HashMap<String, SubscriptionState<Block>>,
/// Backend pinning / unpinning blocks.
@@ -341,12 +418,14 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
pub fn new(
global_max_pinned_blocks: usize,
local_max_pin_duration: Duration,
max_ongoing_operations: usize,
backend: Arc<BE>,
) -> Self {
SubscriptionsInner {
global_blocks: Default::default(),
global_max_pinned_blocks,
local_max_pin_duration,
max_ongoing_operations,
subs: Default::default(),
backend,
}
@@ -366,6 +445,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
with_runtime,
tx_stop: Some(tx_stop),
response_sender,
limits: LimitOperations::new(self.max_ongoing_operations),
next_operation_id: 0,
blocks: Default::default(),
};
@@ -541,6 +621,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
&mut self,
sub_id: &str,
hash: Block::Hash,
to_reserve: usize,
) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
let Some(sub) = self.subs.get_mut(sub_id) else {
return Err(SubscriptionManagementError::SubscriptionAbsent)
@@ -550,12 +631,18 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
return Err(SubscriptionManagementError::BlockHashAbsent)
}

let Some(permit_operations) = sub.reserve_at_most(to_reserve) else {
// Error when the server cannot execute at least one operation.
return Err(SubscriptionManagementError::ExceededLimits)
};

let operation_id = sub.next_operation_id();
BlockGuard::new(
hash,
sub.with_runtime,
sub.response_sender.clone(),
operation_id,
permit_operations,
self.backend.clone(),
)
}
@@ -574,6 +661,9 @@ mod tests {
Client, ClientBlockImportExt, GenesisInit,
};

/// Maximum number of ongoing operations per subscription ID.
const MAX_OPERATIONS_PER_SUB: usize = 16;

fn init_backend() -> (
Arc<sc_client_api::in_mem::Backend<Block>>,
Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
@@ -669,6 +759,7 @@ mod tests {
tx_stop: None,
response_sender,
next_operation_id: 0,
limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB),
blocks: Default::default(),
};

@@ -698,6 +789,7 @@ mod tests {
tx_stop: None,
response_sender,
next_operation_id: 0,
limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB),
blocks: Default::default(),
};

@@ -730,27 +822,28 @@ mod tests {
fn subscription_lock_block() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);

let id = "abc".to_string();
let hash = H256::random();

// Subscription not inserted.
let err = subs.lock_block(&id, hash).unwrap_err();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);

let _stop = subs.insert_subscription(id.clone(), true).unwrap();
// Cannot insert the same subscription ID twice.
assert!(subs.insert_subscription(id.clone(), true).is_none());

// No block hash.
let err = subs.lock_block(&id, hash).unwrap_err();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);

subs.remove_subscription(&id);

// No subscription.
let err = subs.lock_block(&id, hash).unwrap_err();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
}

@@ -762,15 +855,16 @@ mod tests {
let hash = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();

let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();

let _stop = subs.insert_subscription(id.clone(), true).unwrap();

// First time we are pinning the block.
assert_eq!(subs.pin_block(&id, hash).unwrap(), true);

let block = subs.lock_block(&id, hash).unwrap();
let block = subs.lock_block(&id, hash, 1).unwrap();
// Subscription started with runtime updates
assert_eq!(block.has_runtime(), true);

@@ -780,7 +874,7 @@ mod tests {

// Unpin the block.
subs.unpin_block(&id, hash).unwrap();
let err = subs.lock_block(&id, hash).unwrap_err();
let err = subs.lock_block(&id, hash, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
}

@@ -791,7 +885,8 @@ mod tests {
let hash = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();

let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();

let _stop = subs.insert_subscription(id.clone(), true).unwrap();
@@ -839,7 +934,8 @@ mod tests {
let hash_3 = block.header.hash();
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();

let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();

@@ -884,7 +980,8 @@ mod tests {
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();

// Maximum number of pinned blocks is 2.
let mut subs = SubscriptionsInner::new(2, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();

@@ -908,10 +1005,10 @@ mod tests {
assert_eq!(err, SubscriptionManagementError::ExceededLimits);

// Ensure both subscriptions are removed.
let err = subs.lock_block(&id_1, hash_1).unwrap_err();
let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);

let err = subs.lock_block(&id_2, hash_1).unwrap_err();
let err = subs.lock_block(&id_2, hash_1, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);

assert!(subs.global_blocks.get(&hash_1).is_none());
@@ -934,7 +1031,8 @@ mod tests {
futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();

// Maximum number of pinned blocks is 2 and maximum pin duration is 5 second.
let mut subs = SubscriptionsInner::new(2, Duration::from_secs(5), backend);
let mut subs =
SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();

@@ -958,10 +1056,10 @@ mod tests {
assert_eq!(err, SubscriptionManagementError::ExceededLimits);

// Ensure both subscriptions are removed.
let err = subs.lock_block(&id_1, hash_1).unwrap_err();
let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);

let _block_guard = subs.lock_block(&id_2, hash_1).unwrap();
let _block_guard = subs.lock_block(&id_2, hash_1, 1).unwrap();

assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
assert!(subs.global_blocks.get(&hash_2).is_none());
@@ -983,7 +1081,8 @@ mod tests {
fn subscription_check_stop_event() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);

let id = "abc".to_string();

@@ -1000,4 +1099,30 @@ mod tests {
let res = sub_data.rx_stop.try_recv().unwrap();
assert!(res.is_some());
}

#[test]
fn ongoing_operations() {
// The object can hold at most 2 operations.
let ops = LimitOperations::new(2);

// One operation is reserved.
let permit_one = ops.reserve_at_most(1).unwrap();
assert_eq!(permit_one.num_reserved(), 1);

// Request 2 operations, however there is capacity only for one.
let permit_two = ops.reserve_at_most(2).unwrap();
// Number of reserved permits is smaller than provided.
assert_eq!(permit_two.num_reserved(), 1);

// Try to reserve operations when there's no space.
let permit = ops.reserve_at_most(1);
assert!(permit.is_none());

// Release capacity.
drop(permit_two);

// Can reserve again
let permit_three = ops.reserve_at_most(1).unwrap();
assert_eq!(permit_three.num_reserved(), 1);
}
}
13 changes: 9 additions & 4 deletions client/rpc-spec-v2/src/chain_head/subscription/mod.rs
Original file line number Diff line number Diff line change
@@ -40,12 +40,14 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
pub fn new(
global_max_pinned_blocks: usize,
local_max_pin_duration: Duration,
max_ongoing_operations: usize,
backend: Arc<BE>,
) -> Self {
SubscriptionManagement {
inner: RwLock::new(SubscriptionsInner::new(
global_max_pinned_blocks,
local_max_pin_duration,
max_ongoing_operations,
backend,
)),
}
@@ -110,15 +112,18 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {

/// Ensure the block remains pinned until the return object is dropped.
///
/// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner.
/// Returns an error if the block hash is not pinned for the subscription or
/// the subscription ID is invalid.
/// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner
/// and reserves capacity for ogoing operations.
///
/// Returns an error if the block hash is not pinned for the subscription,
/// the subscription ID is invalid or the limit of ongoing operations was exceeded.
pub fn lock_block(
&self,
sub_id: &str,
hash: Block::Hash,
to_reserve: usize,
) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
let mut inner = self.inner.write();
inner.lock_block(sub_id, hash)
inner.lock_block(sub_id, hash, to_reserve)
}
}
196 changes: 168 additions & 28 deletions client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ type Header = substrate_test_runtime_client::runtime::Header;
type Block = substrate_test_runtime_client::runtime::Block;
const MAX_PINNED_BLOCKS: usize = 32;
const MAX_PINNED_SECS: u64 = 60;
const MAX_OPERATIONS: usize = 16;
const CHAIN_GENESIS: [u8; 32] = [0; 32];
const INVALID_HASH: [u8; 32] = [1; 32];
const KEY: &[u8] = b":mock";
@@ -79,8 +80,11 @@ async fn setup_api() -> (
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -119,8 +123,11 @@ async fn follow_subscription_produces_blocks() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -177,8 +184,11 @@ async fn follow_with_runtime() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -285,8 +295,11 @@ async fn get_genesis() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -491,8 +504,11 @@ async fn call_runtime_without_flag() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -1117,8 +1133,11 @@ async fn separate_operation_ids_for_subscriptions() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -1194,8 +1213,11 @@ async fn follow_generates_initial_blocks() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -1322,8 +1344,11 @@ async fn follow_exceeding_pinned_blocks() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
2,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: 2,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -1373,8 +1398,11 @@ async fn follow_with_unpin() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
2,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: 2,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -1454,8 +1482,11 @@ async fn follow_prune_best_block() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -1611,8 +1642,11 @@ async fn follow_forks_pruned_block() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -1725,8 +1759,11 @@ async fn follow_report_multiple_pruned_block() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -1930,8 +1967,11 @@ async fn pin_block_references() {
backend.clone(),
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
3,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: 3,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -2040,8 +2080,11 @@ async fn follow_finalized_before_new_block() {
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECS),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
},
)
.into_rpc();

@@ -2119,3 +2162,100 @@ async fn follow_finalized_before_new_block() {
});
assert_eq!(event, expected);
}

#[tokio::test]
async fn ensure_operation_limits_works() {
let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY);
let builder = TestClientBuilder::new().add_extra_child_storage(
&child_info,
KEY.to_vec(),
CHILD_VALUE.to_vec(),
);
let backend = builder.backend();
let mut client = Arc::new(builder.build());

// Configure the chainHead with maximum 1 ongoing operations.
let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: 1,
},
)
.into_rpc();

let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();

let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block.clone()).await.unwrap();

// Ensure the imported block is propagated and pinned for this subscription.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::Initialized(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::NewBlock(_)
);
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::BestBlockChanged(_)
);

let block_hash = format!("{:?}", block.header.hash());
let key = hex_string(&KEY);

let items = vec![
StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsHashes },
StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsHashes },
StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsValues },
StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsValues },
];

let response: MethodResponse = api
.call("chainHead_unstable_storage", rpc_params![&sub_id, &block_hash, items])
.await
.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => {
// Check discarded items.
assert_eq!(started.discarded_items.unwrap(), 3);
started.operation_id
},
MethodResponse::LimitReached => panic!("Expected started response"),
};
// No value associated with the provided key.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
);

// The storage is finished and capactiy must be released.
let alice_id = AccountKeyring::Alice.to_account_id();
// Hex encoded scale encoded bytes representing the call parameters.
let call_parameters = hex_string(&alice_id.encode());
let response: MethodResponse = api
.call(
"chainHead_unstable_call",
[&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters],
)
.await
.unwrap();
let operation_id = match response {
MethodResponse::Started(started) => started.operation_id,
MethodResponse::LimitReached => panic!("Expected started response"),
};

// Response propagated to `chainHead_follow`.
assert_matches!(
get_next_event::<FollowEvent<String>>(&mut sub).await,
FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000"
);
}
21 changes: 3 additions & 18 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
@@ -74,11 +74,7 @@ use sp_consensus::block_validation::{
use sp_core::traits::{CodeExecutor, SpawnNamed};
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
use std::{
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};
use std::{str::FromStr, sync::Arc, time::SystemTime};

/// Full client type.
pub type TFullClient<TBl, TRtApi, TExec> =
@@ -636,24 +632,13 @@ where
)
.into_rpc();

// Maximum pinned blocks across all connections.
// This number is large enough to consider immediate blocks.
// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
const MAX_PINNED_BLOCKS: usize = 512;

// Any block of any subscription should not be pinned more than
// this constant. When a subscription contains a block older than this,
// the subscription becomes subject to termination.
// Note: This should be enough for immediate blocks.
const MAX_PINNED_SECONDS: u64 = 60;

let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
client.clone(),
backend.clone(),
task_executor.clone(),
client.info().genesis_hash,
MAX_PINNED_BLOCKS,
Duration::from_secs(MAX_PINNED_SECONDS),
// Defaults to sensible limits for the `ChainHead`.
sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
)
.into_rpc();