Skip to content

Commit

Permalink
BACKPORT-CONFLICT
Browse files Browse the repository at this point in the history
  • Loading branch information
pkhry authored and github-actions[bot] committed Oct 18, 2024
1 parent fd68193 commit fb07d92
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 69 deletions.
18 changes: 18 additions & 0 deletions prdoc/pr_6058.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: backpressure `chainhead_v1_follow`

doc:
- audience: Node Operator
description: |
The RPC endpoint `chainHead_v1_follow` now relies on backpressure
to determine whether or not the subscription should be closed instead of continuing to send more events
to a consumer which can't keep up.
This should significantly improve memory consumption as substrate will be keeping less messages in memory.

crates:
- name: sc-rpc-spec-v2
bump: major
- name: sc-rpc
bump: major
8 changes: 8 additions & 0 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub struct ChainHeadConfig {
pub operation_max_storage_items: usize,
/// The maximum number of `chainHead_follow` subscriptions per connection.
pub max_follow_subscriptions_per_connection: usize,
/// The maximum number of pending messages per subscription.
pub subscription_buffer_cap: usize,
}

/// Maximum pinned blocks across all connections.
Expand Down Expand Up @@ -107,6 +109,7 @@ impl Default for ChainHeadConfig {
max_lagging_distance: MAX_LAGGING_DISTANCE,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
subscription_buffer_cap: MAX_PINNED_BLOCKS,
}
}
}
Expand All @@ -129,6 +132,8 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
max_lagging_distance: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
/// The maximum number of pending messages per subscription.
subscription_buffer_cap: usize,
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
Expand All @@ -152,6 +157,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
),
operation_max_storage_items: config.operation_max_storage_items,
max_lagging_distance: config.max_lagging_distance,
subscription_buffer_cap: config.subscription_buffer_cap,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -200,6 +206,7 @@ where
let backend = self.backend.clone();
let client = self.client.clone();
let max_lagging_distance = self.max_lagging_distance;
let subscription_buffer_cap = self.subscription_buffer_cap;

let fut = async move {
// Ensure the current connection ID has enough space to accept a new subscription.
Expand Down Expand Up @@ -235,6 +242,7 @@ where
with_runtime,
sub_id.clone(),
max_lagging_distance,
subscription_buffer_cap,
);
let result = chain_head_follow.generate_events(sink, sub_data).await;
if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
Expand Down
92 changes: 37 additions & 55 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ use crate::chain_head::{
};
use futures::{
channel::oneshot,
stream::{self, Stream, StreamExt},
stream::{self, Stream, StreamExt, TryStreamExt},
};
use futures_util::future::Either;
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
Expand Down Expand Up @@ -74,6 +73,8 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
/// The maximum number of pending messages per subscription.
pub subscription_buffer_cap: usize,
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
Expand All @@ -85,6 +86,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
with_runtime: bool,
sub_id: String,
max_lagging_distance: usize,
subscription_buffer_cap: usize,
) -> Self {
Self {
client,
Expand All @@ -97,6 +99,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
)),
max_lagging_distance,
subscription_buffer_cap,
}
}
}
Expand Down Expand Up @@ -590,71 +593,50 @@ where
async fn submit_events<EventStream>(
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
stream: EventStream,
sink: Subscription,
rx_stop: oneshot::Receiver<()>,
) -> Result<(), SubscriptionManagementError>
where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
EventStream: Stream<Item = NotificationType<Block>> + Unpin + Send,
{
let mut stream_item = stream.next();

// The stop event can be triggered by the chainHead logic when the pinned
// block guarantee cannot be hold. Or when the client is disconnected.
let connection_closed = sink.closed();
tokio::pin!(connection_closed);
let mut stop_event = futures_util::future::select(rx_stop, connection_closed);

while let Either::Left((Some(event), next_stop_event)) =
futures_util::future::select(stream_item, stop_event).await
{
let events = match event {
NotificationType::InitialEvents(events) => Ok(events),
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};
let buffer_cap = self.subscription_buffer_cap;
// create a channel to propagate error messages
let mut handle_events = |event| match event {
NotificationType::InitialEvents(events) => Ok(events),
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};

let events = match events {
Ok(events) => events,
Err(err) => {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to handle stream notification {:?}",
self.sub_id,
err
);
_ = sink.send(&FollowEvent::<String>::Stop).await;
return Err(err)
},
};
let stream = stream
.map(|event| handle_events(event))
.map_ok(|items| stream::iter(items).map(Ok))
.try_flatten();

tokio::pin!(stream);

let sink_future =
sink.pipe_from_try_stream(stream, sc_rpc::utils::BoundedVecDeque::new(buffer_cap));

for event in events {
if let Err(err) = sink.send(&event).await {
// Failed to submit event.
let result = tokio::select! {
_ = rx_stop => Ok(()),
result = sink_future => {
if let Err(ref e) = result {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
"[follow][id={:?}] Failed to handle stream notification {:?}",
&self.sub_id,
e
);

let _ = sink.send(&FollowEvent::<String>::Stop).await;
// No need to propagate this error further, the client disconnected.
return Ok(())
}
};
result
}

stream_item = stream.next();
stop_event = next_stop_event;
}

// If we got here either:
// - the substrate streams have closed
// - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee)
// - the client disconnected.
};
let _ = sink.send(&FollowEvent::<String>::Stop).await;
Ok(())
result
}

/// Generate the block events for the `chainHead_follow` method.
Expand Down
Loading

0 comments on commit fb07d92

Please sign in to comment.