From 91e508674de699079c7cfcfb7fb9437c4e800934 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 22 Jun 2023 10:08:52 +0000 Subject: [PATCH 01/10] Move expensive computations to blocking thread Signed-off-by: Andrei Sandu --- node/network/availability-recovery/src/lib.rs | 178 +++++++++++++++--- 1 file changed, 151 insertions(+), 27 deletions(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index c771e31a6c40..8cecd6643299 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -26,18 +26,21 @@ use std::{ }; use futures::{ - channel::oneshot, - future::{FutureExt, RemoteHandle}, + channel::oneshot::{self, channel}, + future::{Future, FutureExt, RemoteHandle}, pin_mut, prelude::*, - stream::FuturesUnordered, + sink::SinkExt, + stream::{FuturesUnordered, StreamExt}, task::{Context, Poll}, }; use lru::LruCache; use rand::seq::SliceRandom; use fatality::Nested; -use polkadot_erasure_coding::{branch_hash, branches, obtain_chunks_v1, recovery_threshold}; +use polkadot_erasure_coding::{ + branch_hash, branches, obtain_chunks_v1, recovery_threshold, Error as ErasureEncodingError, +}; #[cfg(not(test))] use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT; use polkadot_node_network_protocol::{ @@ -150,6 +153,8 @@ struct RequestFromBackers { // a random shuffling of the validators from the backing group which indicates the order // in which we connect to them and request the chunk. shuffled_backers: Vec, + // channel to the erasure task handler. + erasure_task_tx: futures::channel::mpsc::Sender, } struct RequestChunksFromValidators { @@ -165,6 +170,8 @@ struct RequestChunksFromValidators { received_chunks: HashMap, /// Pending chunk requests with soft timeout. requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, + // channel to the erasure task handler. + erasure_task_tx: futures::channel::mpsc::Sender, } struct RecoveryParams { @@ -198,6 +205,18 @@ enum Source { RequestChunks(RequestChunksFromValidators), } +/// Expensive erasure coding computations that we want to run on a blocking thread. +enum ErasureTask { + /// Reconstructs `AvailableData` from chunks given `n_validators`. + Reconstruct( + usize, + HashMap, + oneshot::Sender>, + ), + /// Re-encode `AvailableData` into erasure chunks in order to verify the provided root hash of the Merkle tree. + Reencode(usize, Hash, AvailableData, oneshot::Sender), +} + /// A stateful reconstruction of availability data in reference to /// a candidate hash. struct RecoveryTask { @@ -208,13 +227,19 @@ struct RecoveryTask { /// The source to obtain the availability data from. source: Source, + + // channel to the erasure task handler. + erasure_task_tx: futures::channel::mpsc::Sender, } impl RequestFromBackers { - fn new(mut backers: Vec) -> Self { + fn new( + mut backers: Vec, + erasure_task_tx: futures::channel::mpsc::Sender, + ) -> Self { backers.shuffle(&mut rand::thread_rng()); - RequestFromBackers { shuffled_backers: backers } + RequestFromBackers { shuffled_backers: backers, erasure_task_tx } } // Run this phase to completion. @@ -251,12 +276,21 @@ impl RequestFromBackers { match response.await { Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { - if reconstructed_data_matches_root( - params.validators.len(), - ¶ms.erasure_root, - &data, - ¶ms.metrics, - ) { + let (reencode_tx, reencode_rx) = channel(); + // TODO: don't clone data as it is big, instead use `Option` instead of bool to send it back. + let _ = self + .erasure_task_tx + .send(ErasureTask::Reencode( + params.validators.len(), + params.erasure_root, + data.clone(), + reencode_tx, + )) + .await; + let reencode_response = + reencode_rx.await.map_err(|_| RecoveryError::Internal)?; + + if reencode_response { gum::trace!( target: LOG_TARGET, candidate_hash = ?params.candidate_hash, @@ -289,7 +323,10 @@ impl RequestFromBackers { } impl RequestChunksFromValidators { - fn new(n_validators: u32) -> Self { + fn new( + n_validators: u32, + erasure_task_tx: futures::channel::mpsc::Sender, + ) -> Self { let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect(); shuffling.shuffle(&mut rand::thread_rng()); @@ -299,6 +336,7 @@ impl RequestChunksFromValidators { shuffling: shuffling.into(), received_chunks: HashMap::new(), requesting_chunks: FuturesUndead::new(), + erasure_task_tx, } } @@ -578,17 +616,37 @@ impl RequestChunksFromValidators { if self.received_chunks.len() >= params.threshold { let recovery_duration = metrics.time_erasure_recovery(); - return match polkadot_erasure_coding::reconstruct_v1( - params.validators.len(), - self.received_chunks.values().map(|c| (&c.chunk[..], c.index.0 as usize)), - ) { + // Send request to reconstruct available data from chunks. + let (avilable_data_tx, available_data_rx) = channel(); + let _ = self + .erasure_task_tx + .send(ErasureTask::Reconstruct( + params.validators.len(), + //TODO: Avoid clone with `Option`. + self.received_chunks.clone(), + avilable_data_tx, + )) + .await; + let available_data_response = + available_data_rx.await.map_err(|_| RecoveryError::Internal)?; + + return match available_data_response { Ok(data) => { - if reconstructed_data_matches_root( - params.validators.len(), - ¶ms.erasure_root, - &data, - &metrics, - ) { + // Send request to re-encode the chunks and check merkle root. + let (reencode_tx, reencode_rx) = channel(); + let _ = self + .erasure_task_tx + .send(ErasureTask::Reencode( + params.validators.len(), + params.erasure_root, + data.clone(), + reencode_tx, + )) + .await; + let reencode_response = + reencode_rx.await.map_err(|_| RecoveryError::Internal)?; + + if reencode_response { gum::trace!( target: LOG_TARGET, candidate_hash = ?params.candidate_hash, @@ -746,9 +804,11 @@ where match from_backers.run(&self.params, &mut self.sender).await { Ok(data) => break Ok(data), Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid), + Err(RecoveryError::Internal) => break Err(RecoveryError::Internal), Err(RecoveryError::Unavailable) => self.source = Source::RequestChunks(RequestChunksFromValidators::new( self.params.validators.len() as _, + self.erasure_task_tx.clone(), )), } }, @@ -838,6 +898,7 @@ impl TryFrom> for CachedRecovery { // We don't want to cache unavailable state, as that state might change, so if // requested again we want to try again! Err(RecoveryError::Unavailable) => Err(()), + Err(RecoveryError::Internal) => Err(()), } } } @@ -904,9 +965,9 @@ async fn launch_recovery_task( response_sender: oneshot::Sender>, metrics: &Metrics, recovery_strategy: &RecoveryStrategy, + erasure_task_tx: futures::channel::mpsc::Sender, ) -> error::Result<()> { let candidate_hash = receipt.hash(); - let params = RecoveryParams { validator_authority_keys: session_info.discovery_keys.clone(), validators: session_info.validators.clone(), @@ -943,12 +1004,21 @@ async fn launch_recovery_task( let phase = backing_group .and_then(|g| session_info.validator_groups.get(g)) - .map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone()))) + .map(|group| { + Source::RequestFromBackers(RequestFromBackers::new( + group.clone(), + erasure_task_tx.clone(), + )) + }) .unwrap_or_else(|| { - Source::RequestChunks(RequestChunksFromValidators::new(params.validators.len() as _)) + Source::RequestChunks(RequestChunksFromValidators::new( + params.validators.len() as _, + erasure_task_tx.clone(), + )) }); - let recovery_task = RecoveryTask { sender: ctx.sender().clone(), params, source: phase }; + let recovery_task = + RecoveryTask { sender: ctx.sender().clone(), params, source: phase, erasure_task_tx }; let (remote, remote_handle) = recovery_task.run().remote_handle(); @@ -980,6 +1050,7 @@ async fn handle_recover( response_sender: oneshot::Sender>, metrics: &Metrics, recovery_strategy: &RecoveryStrategy, + erasure_task_tx: futures::channel::mpsc::Sender, ) -> error::Result<()> { let candidate_hash = receipt.hash(); @@ -1024,6 +1095,7 @@ async fn handle_recover( response_sender, metrics, recovery_strategy, + erasure_task_tx, ) .await, None => { @@ -1106,10 +1178,61 @@ impl AvailabilityRecoverySubsystem { let mut state = State::default(); let Self { recovery_strategy, mut req_receiver, metrics } = self; + let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16); + let mut erasure_task_rx = erasure_task_rx.fuse(); + loop { let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse(); pin_mut!(recv_req); futures::select! { + erasure_task = erasure_task_rx.next() => { + match erasure_task { + Some(ErasureTask::Reconstruct(n_validators, chunks, sender )) => { + let result = ctx.spawn_blocking("reconstruct_v1", Box::pin(async move { + let _ = sender.send(polkadot_erasure_coding::reconstruct_v1( + n_validators, + chunks.values().map(|c| (&c.chunk[..], c.index.0 as usize)), + )); + })); + + if let Err(e) = result { + gum::warn!( + target: LOG_TARGET, + err = ?e, + "Failed to spawn a erasure coding task", + ); + } + }, + Some(ErasureTask::Reencode(n_validators, root, available_data, sender)) => { + let metrics = metrics.clone(); + + let result = ctx.spawn_blocking("re-encode", Box::pin(async move { + let _ = sender.send(reconstructed_data_matches_root( + n_validators, + &root, + &available_data, + &metrics, + )); + })); + + if let Err(e) = result { + gum::warn!( + target: LOG_TARGET, + err = ?e, + "Failed to spawn a erasure coding task", + ); + } + }, + None => { + gum::debug!( + target: LOG_TARGET, + "Erasure task channel closed", + ); + + return Err(SubsystemError::with_origin("availability-recovery", RecoveryError::Internal)) + } + } + } v = ctx.recv().fuse() => { match v? { FromOrchestra::Signal(signal) => if handle_signal( @@ -1135,6 +1258,7 @@ impl AvailabilityRecoverySubsystem { response_sender, &metrics, &recovery_strategy, + erasure_task_tx.clone(), ).await { gum::warn!( target: LOG_TARGET, From 93acbcce2f4ac72d37bf9066b6e56e8090b99572 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 22 Jun 2023 10:13:16 +0000 Subject: [PATCH 02/10] fix test Signed-off-by: Andrei Sandu --- node/network/availability-recovery/src/tests.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/node/network/availability-recovery/src/tests.rs b/node/network/availability-recovery/src/tests.rs index b9c5abee191f..026b1b5dcb4e 100644 --- a/node/network/availability-recovery/src/tests.rs +++ b/node/network/availability-recovery/src/tests.rs @@ -1584,7 +1584,9 @@ fn invalid_local_chunk_is_ignored() { fn parallel_request_calculation_works_as_expected() { let num_validators = 100; let threshold = recovery_threshold(num_validators).unwrap(); - let mut phase = RequestChunksFromValidators::new(100); + let (_erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16); + + let mut phase = RequestChunksFromValidators::new(100, erasure_task_tx); assert_eq!(phase.get_desired_request_count(threshold), threshold); phase.error_count = 1; phase.total_received_responses = 1; From 8278a779999e899feae46ed56aeeac59e2e2af4e Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 22 Jun 2023 10:13:49 +0000 Subject: [PATCH 03/10] add internal error and fix dependent subystems Signed-off-by: Andrei Sandu --- node/core/approval-voting/src/lib.rs | 11 +++++++++++ .../core/dispute-coordinator/src/participation/mod.rs | 2 +- node/subsystem-types/src/errors.rs | 4 ++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index f5e888c7c538..e1cd3ed53068 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -2519,6 +2519,17 @@ async fn launch_approval( // do nothing. we'll just be a no-show and that'll cause others to rise up. metrics_guard.take().on_approval_unavailable(); }, + &RecoveryError::Internal => { + gum::warn!( + target: LOG_TARGET, + ?para_id, + ?candidate_hash, + "Internal error while recovering data for candidate {:?}", + (candidate_hash, candidate.descriptor.para_id), + ); + // do nothing. we'll just be a no-show and that'll cause others to rise up. + metrics_guard.take().on_approval_unavailable(); + }, &RecoveryError::Invalid => { gum::warn!( target: LOG_TARGET, diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index b6a41bcff9dd..c028af73f83f 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -319,7 +319,7 @@ async fn participate( send_result(&mut result_sender, req, ParticipationOutcome::Invalid).await; return }, - Ok(Err(RecoveryError::Unavailable)) => { + Ok(Err(RecoveryError::Unavailable)) | Ok(Err(RecoveryError::Internal)) => { send_result(&mut result_sender, req, ParticipationOutcome::Unavailable).await; return }, diff --git a/node/subsystem-types/src/errors.rs b/node/subsystem-types/src/errors.rs index d633ac2ef959..22578339ccb3 100644 --- a/node/subsystem-types/src/errors.rs +++ b/node/subsystem-types/src/errors.rs @@ -75,6 +75,9 @@ pub enum RecoveryError { /// A requested chunk is unavailable. Unavailable, + + /// Internal error, channel closed. + Internal, } impl std::fmt::Display for RecoveryError { @@ -82,6 +85,7 @@ impl std::fmt::Display for RecoveryError { let msg = match self { RecoveryError::Invalid => "Invalid", RecoveryError::Unavailable => "Unavailable", + RecoveryError::Internal => "Internal", }; write!(f, "{}", msg) From ecc87014dbe6189e32602ffb1efbf1d308251f47 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 22 Jun 2023 10:14:17 +0000 Subject: [PATCH 04/10] fmt Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/participation/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index c028af73f83f..c4ac31367510 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -319,7 +319,7 @@ async fn participate( send_result(&mut result_sender, req, ParticipationOutcome::Invalid).await; return }, - Ok(Err(RecoveryError::Unavailable)) | Ok(Err(RecoveryError::Internal)) => { + Ok(Err(RecoveryError::Unavailable)) | Ok(Err(RecoveryError::Internal)) => { send_result(&mut result_sender, req, ParticipationOutcome::Unavailable).await; return }, From 4c0f83aab0d50408b833536cfcebef75ce4304f5 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 22 Jun 2023 10:31:50 +0000 Subject: [PATCH 05/10] fix test fix Signed-off-by: Andrei Sandu --- node/network/availability-recovery/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/network/availability-recovery/src/tests.rs b/node/network/availability-recovery/src/tests.rs index 026b1b5dcb4e..f7a321485db0 100644 --- a/node/network/availability-recovery/src/tests.rs +++ b/node/network/availability-recovery/src/tests.rs @@ -1584,7 +1584,7 @@ fn invalid_local_chunk_is_ignored() { fn parallel_request_calculation_works_as_expected() { let num_validators = 100; let threshold = recovery_threshold(num_validators).unwrap(); - let (_erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16); + let (erasure_task_tx, _erasure_task_rx) = futures::channel::mpsc::channel(16); let mut phase = RequestChunksFromValidators::new(100, erasure_task_tx); assert_eq!(phase.get_desired_request_count(threshold), threshold); From ce8335ff2c4e3e490727bcf63b9b2482476b426a Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 23 Jun 2023 08:35:32 +0000 Subject: [PATCH 06/10] minor refactor and TODOs Signed-off-by: Andrei Sandu --- node/network/availability-recovery/src/lib.rs | 61 +++++++++++++------ .../availability-recovery/src/tests.rs | 8 +-- 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 8cecd6643299..693d22e7d04b 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -167,7 +167,8 @@ struct RequestChunksFromValidators { /// a random shuffling of the validators which indicates the order in which we connect to the validators and /// request the chunk from them. shuffling: VecDeque, - received_chunks: HashMap, + /// Chunks received so far. + received_chunks: Option>, /// Pending chunk requests with soft timeout. requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, // channel to the erasure task handler. @@ -214,7 +215,7 @@ enum ErasureTask { oneshot::Sender>, ), /// Re-encode `AvailableData` into erasure chunks in order to verify the provided root hash of the Merkle tree. - Reencode(usize, Hash, AvailableData, oneshot::Sender), + Reencode(usize, Hash, AvailableData, oneshot::Sender>), } /// A stateful reconstruction of availability data in reference to @@ -277,20 +278,19 @@ impl RequestFromBackers { match response.await { Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { let (reencode_tx, reencode_rx) = channel(); - // TODO: don't clone data as it is big, instead use `Option` instead of bool to send it back. let _ = self .erasure_task_tx .send(ErasureTask::Reencode( params.validators.len(), params.erasure_root, - data.clone(), + data, reencode_tx, )) .await; let reencode_response = reencode_rx.await.map_err(|_| RecoveryError::Internal)?; - if reencode_response { + if let Some(data) = reencode_response { gum::trace!( target: LOG_TARGET, candidate_hash = ?params.candidate_hash, @@ -334,7 +334,7 @@ impl RequestChunksFromValidators { error_count: 0, total_received_responses: 0, shuffling: shuffling.into(), - received_chunks: HashMap::new(), + received_chunks: Some(HashMap::new()), requesting_chunks: FuturesUndead::new(), erasure_task_tx, } @@ -342,15 +342,31 @@ impl RequestChunksFromValidators { fn is_unavailable(&self, params: &RecoveryParams) -> bool { is_unavailable( - self.received_chunks.len(), + self.chunk_count(), self.requesting_chunks.total_len(), self.shuffling.len(), params.threshold, ) } + fn chunk_count(&self) -> usize { + self.received_chunks.as_ref().map_or(0, |chunks| chunks.len()) + } + + fn insert_chunk(&mut self, validator_index: ValidatorIndex, chunk: ErasureChunk) { + // Make sure we have a hashmap. + if self.received_chunks.is_none() { + self.received_chunks = Some(HashMap::new()); + } + + self.received_chunks + .as_mut() + .expect("Just initialized it above; qed") + .insert(validator_index, chunk); + } + fn can_conclude(&self, params: &RecoveryParams) -> bool { - self.received_chunks.len() >= params.threshold || self.is_unavailable(params) + self.chunk_count() >= params.threshold || self.is_unavailable(params) } /// Desired number of parallel requests. @@ -367,7 +383,7 @@ impl RequestChunksFromValidators { // 4. We request more chunks to make up for it ... let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold); // How many chunks are still needed? - let remaining_chunks = threshold.saturating_sub(self.received_chunks.len()); + let remaining_chunks = threshold.saturating_sub(self.chunk_count()); // What is the current error rate, so we can make up for it? let inv_error_rate = self.total_received_responses.checked_div(self.error_count).unwrap_or(0); @@ -468,7 +484,7 @@ impl RequestChunksFromValidators { validator_index = ?chunk.index, "Received valid chunk", ); - self.received_chunks.insert(chunk.index, chunk); + self.insert_chunk(chunk.index, chunk); } else { metrics.on_chunk_request_invalid(); self.error_count += 1; @@ -526,7 +542,7 @@ impl RequestChunksFromValidators { gum::debug!( target: LOG_TARGET, candidate_hash = ?params.candidate_hash, - received_chunks_count = ?self.received_chunks.len(), + received_chunks_count = ?self.chunk_count(), requested_chunks_count = ?self.requesting_chunks.len(), threshold = ?params.threshold, "Can conclude availability for a candidate", @@ -568,7 +584,7 @@ impl RequestChunksFromValidators { validator_index = ?chunk.index, "Found valid chunk on disk" ); - self.received_chunks.insert(chunk.index, chunk); + self.insert_chunk(chunk.index, chunk); } else { gum::error!( target: LOG_TARGET, @@ -595,7 +611,7 @@ impl RequestChunksFromValidators { target: LOG_TARGET, candidate_hash = ?params.candidate_hash, erasure_root = ?params.erasure_root, - received = %self.received_chunks.len(), + received = %self.chunk_count(), requesting = %self.requesting_chunks.len(), total_requesting = %self.requesting_chunks.total_len(), n_validators = %params.validators.len(), @@ -613,7 +629,7 @@ impl RequestChunksFromValidators { // If received_chunks has more than threshold entries, attempt to recover the data. // If that fails, or a re-encoding of it doesn't match the expected erasure root, // return Err(RecoveryError::Invalid) - if self.received_chunks.len() >= params.threshold { + if self.chunk_count() >= params.threshold && self.received_chunks.is_some() { let recovery_duration = metrics.time_erasure_recovery(); // Send request to reconstruct available data from chunks. @@ -622,8 +638,7 @@ impl RequestChunksFromValidators { .erasure_task_tx .send(ErasureTask::Reconstruct( params.validators.len(), - //TODO: Avoid clone with `Option`. - self.received_chunks.clone(), + self.received_chunks.take().expect("Just checked it's some above; qed"), avilable_data_tx, )) .await; @@ -639,14 +654,14 @@ impl RequestChunksFromValidators { .send(ErasureTask::Reencode( params.validators.len(), params.erasure_root, - data.clone(), + data, reencode_tx, )) .await; let reencode_response = reencode_rx.await.map_err(|_| RecoveryError::Internal)?; - if reencode_response { + if let Some(data) = reencode_response { gum::trace!( target: LOG_TARGET, candidate_hash = ?params.candidate_hash, @@ -1207,12 +1222,18 @@ impl AvailabilityRecoverySubsystem { let metrics = metrics.clone(); let result = ctx.spawn_blocking("re-encode", Box::pin(async move { - let _ = sender.send(reconstructed_data_matches_root( + let maybe_data = if reconstructed_data_matches_root( n_validators, &root, &available_data, &metrics, - )); + ) { + Some(available_data) + } else { + None + }; + + let _ = sender.send(maybe_data); })); if let Err(e) = result { diff --git a/node/network/availability-recovery/src/tests.rs b/node/network/availability-recovery/src/tests.rs index f7a321485db0..26a99e91a5e2 100644 --- a/node/network/availability-recovery/src/tests.rs +++ b/node/network/availability-recovery/src/tests.rs @@ -1595,20 +1595,20 @@ fn parallel_request_calculation_works_as_expected() { let dummy_chunk = ErasureChunk { chunk: Vec::new(), index: ValidatorIndex(0), proof: Proof::dummy_proof() }; - phase.received_chunks.insert(ValidatorIndex(0), dummy_chunk.clone()); + phase.insert_chunk(ValidatorIndex(0), dummy_chunk.clone()); phase.total_received_responses = 2; // With given error rate - still saturating: assert_eq!(phase.get_desired_request_count(threshold), threshold); for i in 1..9 { - phase.received_chunks.insert(ValidatorIndex(i), dummy_chunk.clone()); + phase.insert_chunk(ValidatorIndex(i), dummy_chunk.clone()); } phase.total_received_responses += 8; // error rate: 1/10 // remaining chunks needed: threshold (34) - 9 // expected: 24 * (1+ 1/10) = (next greater integer) = 27 assert_eq!(phase.get_desired_request_count(threshold), 27); - phase.received_chunks.insert(ValidatorIndex(9), dummy_chunk.clone()); + phase.insert_chunk(ValidatorIndex(9), dummy_chunk.clone()); phase.error_count = 0; // With error count zero - we should fetch exactly as needed: - assert_eq!(phase.get_desired_request_count(threshold), threshold - phase.received_chunks.len()); + assert_eq!(phase.get_desired_request_count(threshold), threshold - phase.chunk_count()); } From 6f0dcdb39e6b1e6de80968cf3ac088d4ac50b8db Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 3 Jul 2023 12:19:43 +0000 Subject: [PATCH 07/10] Impl Feedback for Review Signed-off-by: Andrei Sandu --- node/core/approval-voting/src/lib.rs | 4 +- .../src/participation/mod.rs | 2 +- node/network/availability-recovery/src/lib.rs | 182 ++++++++++++------ node/subsystem-types/src/errors.rs | 6 +- 4 files changed, 127 insertions(+), 67 deletions(-) diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index e1cd3ed53068..a6a74da50480 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -2519,12 +2519,12 @@ async fn launch_approval( // do nothing. we'll just be a no-show and that'll cause others to rise up. metrics_guard.take().on_approval_unavailable(); }, - &RecoveryError::Internal => { + &RecoveryError::ChannelClosed => { gum::warn!( target: LOG_TARGET, ?para_id, ?candidate_hash, - "Internal error while recovering data for candidate {:?}", + "Channel closed while recovering data for candidate {:?}", (candidate_hash, candidate.descriptor.para_id), ); // do nothing. we'll just be a no-show and that'll cause others to rise up. diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index c4ac31367510..25b7352807f6 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -319,7 +319,7 @@ async fn participate( send_result(&mut result_sender, req, ParticipationOutcome::Invalid).await; return }, - Ok(Err(RecoveryError::Unavailable)) | Ok(Err(RecoveryError::Internal)) => { + Ok(Err(RecoveryError::Unavailable)) | Ok(Err(RecoveryError::ChannelClosed)) => { send_result(&mut result_sender, req, ParticipationOutcome::Unavailable).await; return }, diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 693d22e7d04b..53ac1fa2f078 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -20,6 +20,7 @@ use std::{ collections::{HashMap, VecDeque}, + iter::Iterator, num::NonZeroUsize, pin::Pin, time::Duration, @@ -278,17 +279,18 @@ impl RequestFromBackers { match response.await { Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => { let (reencode_tx, reencode_rx) = channel(); - let _ = self - .erasure_task_tx + self.erasure_task_tx .send(ErasureTask::Reencode( params.validators.len(), params.erasure_root, data, reencode_tx, )) - .await; + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + let reencode_response = - reencode_rx.await.map_err(|_| RecoveryError::Internal)?; + reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; if let Some(data) = reencode_response { gum::trace!( @@ -354,14 +356,8 @@ impl RequestChunksFromValidators { } fn insert_chunk(&mut self, validator_index: ValidatorIndex, chunk: ErasureChunk) { - // Make sure we have a hashmap. - if self.received_chunks.is_none() { - self.received_chunks = Some(HashMap::new()); - } - self.received_chunks - .as_mut() - .expect("Just initialized it above; qed") + .get_or_insert_with(|| HashMap::new()) .insert(validator_index, chunk); } @@ -629,37 +625,39 @@ impl RequestChunksFromValidators { // If received_chunks has more than threshold entries, attempt to recover the data. // If that fails, or a re-encoding of it doesn't match the expected erasure root, // return Err(RecoveryError::Invalid) - if self.chunk_count() >= params.threshold && self.received_chunks.is_some() { + if self.chunk_count() >= params.threshold { let recovery_duration = metrics.time_erasure_recovery(); // Send request to reconstruct available data from chunks. let (avilable_data_tx, available_data_rx) = channel(); - let _ = self - .erasure_task_tx + self.erasure_task_tx .send(ErasureTask::Reconstruct( params.validators.len(), - self.received_chunks.take().expect("Just checked it's some above; qed"), + self.received_chunks.take().expect("threshold is always non-zero; qed"), avilable_data_tx, )) - .await; + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + let available_data_response = - available_data_rx.await.map_err(|_| RecoveryError::Internal)?; + available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; return match available_data_response { Ok(data) => { // Send request to re-encode the chunks and check merkle root. let (reencode_tx, reencode_rx) = channel(); - let _ = self - .erasure_task_tx + self.erasure_task_tx .send(ErasureTask::Reencode( params.validators.len(), params.erasure_root, data, reencode_tx, )) - .await; + .await + .map_err(|_| RecoveryError::ChannelClosed)?; + let reencode_response = - reencode_rx.await.map_err(|_| RecoveryError::Internal)?; + reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?; if let Some(data) = reencode_response { gum::trace!( @@ -819,7 +817,8 @@ where match from_backers.run(&self.params, &mut self.sender).await { Ok(data) => break Ok(data), Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid), - Err(RecoveryError::Internal) => break Err(RecoveryError::Internal), + Err(RecoveryError::ChannelClosed) => + break Err(RecoveryError::ChannelClosed), Err(RecoveryError::Unavailable) => self.source = Source::RequestChunks(RequestChunksFromValidators::new( self.params.validators.len() as _, @@ -913,7 +912,7 @@ impl TryFrom> for CachedRecovery { // We don't want to cache unavailable state, as that state might change, so if // requested again we want to try again! Err(RecoveryError::Unavailable) => Err(()), - Err(RecoveryError::Internal) => Err(()), + Err(RecoveryError::ChannelClosed) => Err(()), } } } @@ -1148,6 +1147,7 @@ async fn query_chunk_size( rx.await.map_err(error::Error::CanceledQueryFullData) } + #[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)] impl AvailabilityRecoverySubsystem { /// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the @@ -1196,51 +1196,34 @@ impl AvailabilityRecoverySubsystem { let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16); let mut erasure_task_rx = erasure_task_rx.fuse(); + // Create a thread pool with 2 workers. Pool is guaranteed to have at least 1 worker thread. + let mut to_pool = ThreadPoolBuilder::build( + NonZeroUsize::new(2).expect("There are 2 threads; qed"), + metrics.clone(), + &mut ctx, + ) + .into_iter() + .cycle(); + loop { let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse(); pin_mut!(recv_req); futures::select! { erasure_task = erasure_task_rx.next() => { match erasure_task { - Some(ErasureTask::Reconstruct(n_validators, chunks, sender )) => { - let result = ctx.spawn_blocking("reconstruct_v1", Box::pin(async move { - let _ = sender.send(polkadot_erasure_coding::reconstruct_v1( - n_validators, - chunks.values().map(|c| (&c.chunk[..], c.index.0 as usize)), - )); - })); - - if let Err(e) = result { + Some(task) => { + let send_result = to_pool + .next() + .expect("Pool size is `NonZeroUsize`; qed") + .send(task) + .await + .map_err(|_| RecoveryError::ChannelClosed); + + if let Err(err) = send_result { gum::warn!( target: LOG_TARGET, - err = ?e, - "Failed to spawn a erasure coding task", - ); - } - }, - Some(ErasureTask::Reencode(n_validators, root, available_data, sender)) => { - let metrics = metrics.clone(); - - let result = ctx.spawn_blocking("re-encode", Box::pin(async move { - let maybe_data = if reconstructed_data_matches_root( - n_validators, - &root, - &available_data, - &metrics, - ) { - Some(available_data) - } else { - None - }; - - let _ = sender.send(maybe_data); - })); - - if let Err(e) = result { - gum::warn!( - target: LOG_TARGET, - err = ?e, - "Failed to spawn a erasure coding task", + ?err, + "Failed to send erasure coding task", ); } }, @@ -1250,7 +1233,7 @@ impl AvailabilityRecoverySubsystem { "Erasure task channel closed", ); - return Err(SubsystemError::with_origin("availability-recovery", RecoveryError::Internal)) + return Err(SubsystemError::with_origin("availability-recovery", RecoveryError::ChannelClosed)) } } } @@ -1339,3 +1322,80 @@ impl AvailabilityRecoverySubsystem { } } } + +struct ThreadPoolBuilder; + +const MAX_THREADS: NonZeroUsize = match NonZeroUsize::new(4) { + Some(max_threads) => max_threads, + None => panic!("MAX_THREADS must be non-zero"), +}; + +impl ThreadPoolBuilder { + // Creates a pool of `size` workers, where 1 <= `size` <= `MAX_THREADS`. + #[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)] + pub fn build( + size: NonZeroUsize, + metrics: Metrics, + ctx: &mut Context, + ) -> Vec> { + // At least 1 task, at most `MAX_THREADS. + let size = std::cmp::min(size, MAX_THREADS); + let mut senders = Vec::new(); + + for index in 0..size.into() { + let (tx, rx) = futures::channel::mpsc::channel(8); + senders.push(tx); + + if let Err(e) = ctx + .spawn_blocking("erasure-task", Box::pin(erasure_task_thread(metrics.clone(), rx))) + { + gum::warn!( + target: LOG_TARGET, + err = ?e, + index, + "Failed to spawn a erasure task", + ); + } + } + senders + } +} + +// Handles CPU intensive operation on a dedicated blocking thread. +async fn erasure_task_thread( + metrics: Metrics, + mut ingress: futures::channel::mpsc::Receiver, +) { + loop { + match ingress.next().await { + Some(ErasureTask::Reconstruct(n_validators, chunks, sender)) => { + let _ = sender.send(polkadot_erasure_coding::reconstruct_v1( + n_validators, + chunks.values().map(|c| (&c.chunk[..], c.index.0 as usize)), + )); + }, + Some(ErasureTask::Reencode(n_validators, root, available_data, sender)) => { + let metrics = metrics.clone(); + + let maybe_data = if reconstructed_data_matches_root( + n_validators, + &root, + &available_data, + &metrics, + ) { + Some(available_data) + } else { + None + }; + + let _ = sender.send(maybe_data); + }, + None => { + gum::debug!( + target: LOG_TARGET, + "Erasure task channel closed. Node shutting down ?", + ); + }, + } + } +} diff --git a/node/subsystem-types/src/errors.rs b/node/subsystem-types/src/errors.rs index 22578339ccb3..44136362a69e 100644 --- a/node/subsystem-types/src/errors.rs +++ b/node/subsystem-types/src/errors.rs @@ -76,8 +76,8 @@ pub enum RecoveryError { /// A requested chunk is unavailable. Unavailable, - /// Internal error, channel closed. - Internal, + /// Erasure task channel closed, usually means node is shutting down. + ChannelClosed, } impl std::fmt::Display for RecoveryError { @@ -85,7 +85,7 @@ impl std::fmt::Display for RecoveryError { let msg = match self { RecoveryError::Invalid => "Invalid", RecoveryError::Unavailable => "Unavailable", - RecoveryError::Internal => "Internal", + RecoveryError::ChannelClosed => "ChannelClosed", }; write!(f, "{}", msg) From ab5c932f9108eeda6ab281a073ca6e270c939b14 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 3 Jul 2023 20:48:50 +0000 Subject: [PATCH 08/10] review feedback Signed-off-by: Andrei Sandu --- node/network/availability-recovery/src/lib.rs | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 53ac1fa2f078..4309ed8be29b 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -169,7 +169,7 @@ struct RequestChunksFromValidators { /// request the chunk from them. shuffling: VecDeque, /// Chunks received so far. - received_chunks: Option>, + received_chunks: HashMap, /// Pending chunk requests with soft timeout. requesting_chunks: FuturesUndead, (ValidatorIndex, RequestError)>>, // channel to the erasure task handler. @@ -336,7 +336,7 @@ impl RequestChunksFromValidators { error_count: 0, total_received_responses: 0, shuffling: shuffling.into(), - received_chunks: Some(HashMap::new()), + received_chunks: HashMap::new(), requesting_chunks: FuturesUndead::new(), erasure_task_tx, } @@ -352,13 +352,11 @@ impl RequestChunksFromValidators { } fn chunk_count(&self) -> usize { - self.received_chunks.as_ref().map_or(0, |chunks| chunks.len()) + self.received_chunks.len() } fn insert_chunk(&mut self, validator_index: ValidatorIndex, chunk: ErasureChunk) { - self.received_chunks - .get_or_insert_with(|| HashMap::new()) - .insert(validator_index, chunk); + self.received_chunks.insert(validator_index, chunk); } fn can_conclude(&self, params: &RecoveryParams) -> bool { @@ -633,7 +631,7 @@ impl RequestChunksFromValidators { self.erasure_task_tx .send(ErasureTask::Reconstruct( params.validators.len(), - self.received_chunks.take().expect("threshold is always non-zero; qed"), + std::mem::take(&mut self.received_chunks), avilable_data_tx, )) .await @@ -1196,8 +1194,22 @@ impl AvailabilityRecoverySubsystem { let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16); let mut erasure_task_rx = erasure_task_rx.fuse(); - // Create a thread pool with 2 workers. Pool is guaranteed to have at least 1 worker thread. + // `ThreadPoolBuilder` spawns the tasks using `spawn_blocking`. For each worker there will be a `mpsc` channel created. + // Each of these workers take the `Receiver` and poll it in an infinite loop. + // All of the sender ends of the channel are sent as a vec which we then use to create a `Cycle` iterator. + // We use this iterator to assign work in a round-robin fashion to the workers in the pool. + // + // How work is dispatched to the pool from the recovery tasks: + // - Once a recovery tasks finish retrieving the availability data, it needs to do some heavy CPU computation. + // To do so it sends an `ErasureTask` to the main loop via the `erasure_task` channel, and waits for the results + // over a `oneshot` channel. + // - In the subsystem main loop we poll the `erasure_task_rx` receiver. + // - We forward the received `ErasureTask` to the `next()` sender yielded by the `Cycle` iterator. + // - Some worker thread handles it and sends the response over the `oneshot` channel. + + // Create a thread pool with 2 workers. let mut to_pool = ThreadPoolBuilder::build( + // Pool is guaranteed to have at least 1 worker thread. NonZeroUsize::new(2).expect("There are 2 threads; qed"), metrics.clone(), &mut ctx, @@ -1323,6 +1335,7 @@ impl AvailabilityRecoverySubsystem { } } +// A simple thread pool implementation using `spawn_blocking` threads. struct ThreadPoolBuilder; const MAX_THREADS: NonZeroUsize = match NonZeroUsize::new(4) { @@ -1332,6 +1345,11 @@ const MAX_THREADS: NonZeroUsize = match NonZeroUsize::new(4) { impl ThreadPoolBuilder { // Creates a pool of `size` workers, where 1 <= `size` <= `MAX_THREADS`. + // + // Each worker is created by `spawn_blocking` and takes the receiver side of a channel + // while all of the senders are returned to the caller. + // + // The caller is responsible for routing work to the workers. #[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)] pub fn build( size: NonZeroUsize, From 34331745feaccdf6ae1b605ae989fa38de5b93a3 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 3 Jul 2023 20:53:45 +0000 Subject: [PATCH 09/10] More docs Signed-off-by: Andrei Sandu --- node/network/availability-recovery/src/lib.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 4309ed8be29b..c3ba263d868a 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -1347,7 +1347,11 @@ impl ThreadPoolBuilder { // Creates a pool of `size` workers, where 1 <= `size` <= `MAX_THREADS`. // // Each worker is created by `spawn_blocking` and takes the receiver side of a channel - // while all of the senders are returned to the caller. + // while all of the senders are returned to the caller. Each worker runs `erasure_task_thread` that + // polls the `Receiver` for an `ErasureTask` which is expected to be CPU intensive. The larger + // the input (more or larger chunks/availability data), the more CPU cycles will be spent. + // + // After executing such a task, the worker sends the response via a provided `oneshot` sender. // // The caller is responsible for routing work to the workers. #[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)] From dd2b365359aa3ecbe17fe825f6c0bf8b7d21da82 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 3 Jul 2023 21:01:34 +0000 Subject: [PATCH 10/10] add some example timings in comments Signed-off-by: Andrei Sandu --- node/network/availability-recovery/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index c3ba263d868a..e8503ee454a2 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -1200,7 +1200,8 @@ impl AvailabilityRecoverySubsystem { // We use this iterator to assign work in a round-robin fashion to the workers in the pool. // // How work is dispatched to the pool from the recovery tasks: - // - Once a recovery tasks finish retrieving the availability data, it needs to do some heavy CPU computation. + // - Once a recovery task finishes retrieving the availability data, it needs to reconstruct from chunks and/or + // re-encode the data which are heavy CPU computations. // To do so it sends an `ErasureTask` to the main loop via the `erasure_task` channel, and waits for the results // over a `oneshot` channel. // - In the subsystem main loop we poll the `erasure_task_rx` receiver. @@ -1351,6 +1352,8 @@ impl ThreadPoolBuilder { // polls the `Receiver` for an `ErasureTask` which is expected to be CPU intensive. The larger // the input (more or larger chunks/availability data), the more CPU cycles will be spent. // + // For example, for 32KB PoVs, we'd expect re-encode to eat as much as 90ms and 500ms for 2.5MiB. + // // After executing such a task, the worker sends the response via a provided `oneshot` sender. // // The caller is responsible for routing work to the workers.