From 7525ea0e879cc04197d4c235d5b9119944cdb025 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 23 Nov 2023 15:33:48 +0800 Subject: [PATCH] Refactor: rpc error simulation in tests/ --- openraft/src/network/rpc_type.rs | 2 +- .../t11_append_inconsistent_log.rs | 2 +- ...replication_1_voter_to_isolated_learner.rs | 4 +- tests/tests/client_api/t11_client_reads.rs | 4 +- tests/tests/fixtures/mod.rs | 183 ++++++++++-------- tests/tests/membership/t11_add_learner.rs | 4 +- .../membership/t30_commit_joint_config.rs | 8 +- .../membership/t30_elect_with_new_config.rs | 4 +- .../t51_remove_unreachable_follower.rs | 4 +- .../replication/t50_append_entries_backoff.rs | 6 +- .../t30_purge_in_snapshot_logs.rs | 4 +- .../t34_replication_does_not_block_purge.rs | 4 +- .../t50_snapshot_line_rate_to_snapshot.rs | 4 +- ...ot_to_unreachable_node_should_not_block.rs | 2 +- 14 files changed, 132 insertions(+), 103 deletions(-) diff --git a/openraft/src/network/rpc_type.rs b/openraft/src/network/rpc_type.rs index c3ae31455..efd391f72 100644 --- a/openraft/src/network/rpc_type.rs +++ b/openraft/src/network/rpc_type.rs @@ -1,6 +1,6 @@ use std::fmt; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] pub enum RPCTypes { Vote, diff --git a/tests/tests/append_entries/t11_append_inconsistent_log.rs b/tests/tests/append_entries/t11_append_inconsistent_log.rs index f5a5f8542..5efab79c9 100644 --- a/tests/tests/append_entries/t11_append_inconsistent_log.rs +++ b/tests/tests/append_entries/t11_append_inconsistent_log.rs @@ -73,7 +73,7 @@ async fn append_inconsistent_log() -> Result<()> { ); { router.new_raft_node_with_sto(1, sto1.clone(), sm1.clone()).await; - router.set_node_network_failure(1, true); + router.set_network_error(1, true); } tracing::info!(log_index, "--- restart node 0 and 2"); diff --git a/tests/tests/append_entries/t30_replication_1_voter_to_isolated_learner.rs b/tests/tests/append_entries/t30_replication_1_voter_to_isolated_learner.rs index da9d1bbe0..0af1e2ebc 100644 --- a/tests/tests/append_entries/t30_replication_1_voter_to_isolated_learner.rs +++ b/tests/tests/append_entries/t30_replication_1_voter_to_isolated_learner.rs @@ -30,7 +30,7 @@ async fn replication_1_voter_to_isolated_learner() -> Result<()> { tracing::info!(log_index, "--- stop replication to node 1"); { - router.set_node_network_failure(1, true); + router.set_network_error(1, true); router.client_request_many(0, "0", (10 - log_index) as usize).await?; log_index = 10; @@ -47,7 +47,7 @@ async fn replication_1_voter_to_isolated_learner() -> Result<()> { tracing::info!(log_index, "--- restore replication to node 1"); { - router.set_node_network_failure(1, false); + router.set_network_error(1, false); router.client_request_many(0, "0", (10 - log_index) as usize).await?; log_index = 10; diff --git a/tests/tests/client_api/t11_client_reads.rs b/tests/tests/client_api/t11_client_reads.rs index a86b4107a..52a1a0716 100644 --- a/tests/tests/client_api/t11_client_reads.rs +++ b/tests/tests/client_api/t11_client_reads.rs @@ -44,12 +44,12 @@ async fn client_reads() -> Result<()> { tracing::info!(log_index, "--- isolate node 1 then is_leader should work"); - router.set_node_network_failure(1, true); + router.set_network_error(1, true); router.is_leader(leader).await?; tracing::info!(log_index, "--- isolate node 2 then is_leader should fail"); - router.set_node_network_failure(2, true); + router.set_network_error(2, true); let rst = router.is_leader(leader).await; tracing::debug!(?rst, "is_leader with majority down"); diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index c86655efd..5545f3fd2 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -6,8 +6,8 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; use std::collections::HashMap; -use std::collections::HashSet; use std::env; +use std::fmt; use std::panic::PanicInfo; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -46,6 +46,9 @@ use openraft::Config; use openraft::LogId; use openraft::LogIdOptionExt; use openraft::MessageSummary; +use openraft::Node; +use openraft::NodeId; +use openraft::RPCTypes; use openraft::Raft; use openraft::RaftLogId; use openraft::RaftMetrics; @@ -134,6 +137,50 @@ pub fn log_panic(panic: &PanicInfo) { eprintln!("{}", backtrace); } +#[derive(Debug, Clone, Copy)] +#[derive(PartialEq, Eq)] +#[derive(Hash)] +enum Direction { + NetSend, + NetRecv, +} + +impl fmt::Display for Direction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + NetSend => write!(f, "sending from"), + NetRecv => write!(f, "receiving by"), + } + } +} + +use Direction::NetRecv; +use Direction::NetSend; + +#[derive(Debug, Clone, Copy)] +enum RPCErrorType { + /// Returns [`Unreachable`](`openraft::error::Unreachable`). + Unreachable, + /// Returns [`NetworkError`](`openraft::error::NetworkError`). + NetworkError, +} + +impl RPCErrorType { + fn make_error(&self, id: NID, dir: Direction) -> RPCError> + where + NID: NodeId, + N: Node, + E: std::error::Error, + { + let msg = format!("error {} id={}", dir, id); + + match self { + RPCErrorType::Unreachable => Unreachable::new(&AnyError::error(msg)).into(), + RPCErrorType::NetworkError => NetworkError::new(&AnyError::error(msg)).into(), + } + } +} + /// A type which emulates a network transport and implements the `RaftNetworkFactory` trait. #[derive(Clone)] pub struct TypedRaftRouter { @@ -144,11 +191,9 @@ pub struct TypedRaftRouter { #[allow(clippy::type_complexity)] routing_table: Arc>>, - /// Nodes that can neither send nor receive frames, and will return an `NetworkError`. - network_failure_nodes: Arc>>, - - /// Nodes to which an RPC is sent return an `Unreachable` error. - unreachable_nodes: Arc>>, + /// Whether to fail a network RPC that is sent from/to a node. + /// And it defines what kind of error to return. + fail_rpc: Arc>>, /// To emulate network delay for sending, in milliseconds. /// 0 means no delay. @@ -161,14 +206,7 @@ pub struct TypedRaftRouter { append_entries_quota: Arc>>, /// Count of RPCs sent. - rpc_count: Arc>>, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum RPCType { - AppendEntries, - InstallSnapshot, - Vote, + rpc_count: Arc>>, } /// Default `RaftRouter` for memstore. @@ -199,8 +237,7 @@ impl Builder { TypedRaftRouter { config: self.config, routing_table: Default::default(), - network_failure_nodes: Default::default(), - unreachable_nodes: Default::default(), + fail_rpc: Default::default(), send_delay: Arc::new(AtomicU64::new(send_delay)), append_entries_quota: Arc::new(Mutex::new(None)), rpc_count: Default::default(), @@ -239,13 +276,13 @@ impl TypedRaftRouter { *append_entries_quota = quota; } - fn count_rpc(&self, rpc_type: RPCType) { + fn count_rpc(&self, rpc_type: RPCTypes) { let mut rpc_count = self.rpc_count.lock().unwrap(); let count = rpc_count.entry(rpc_type).or_insert(0); *count += 1; } - pub fn get_rpc_count(&self) -> HashMap { + pub fn get_rpc_count(&self) -> HashMap { self.rpc_count.lock().unwrap().clone() } @@ -375,10 +412,8 @@ impl TypedRaftRouter { rt.remove(&id) }; - { - let mut isolated = self.network_failure_nodes.lock().unwrap(); - isolated.remove(&id); - } + self.set_network_error(id, false); + self.set_unreachable(id, false); opt_handles } @@ -403,23 +438,36 @@ impl TypedRaftRouter { /// Isolate the network of the specified node. #[tracing::instrument(level = "debug", skip(self))] - pub fn set_node_network_failure(&self, id: MemNodeId, emit_failure: bool) { - let mut nodes = self.network_failure_nodes.lock().unwrap(); - if emit_failure { - nodes.insert(id); + pub fn set_network_error(&self, id: MemNodeId, emit_failure: bool) { + let v = if emit_failure { + Some(RPCErrorType::NetworkError) } else { - nodes.remove(&id); - } + None + }; + + self.set_rpc_failure(id, NetRecv, v); + self.set_rpc_failure(id, NetSend, v); } /// Set to `true` to return [`Unreachable`](`openraft::errors::Unreachable`) when sending RPC to /// a node. pub fn set_unreachable(&self, id: MemNodeId, unreachable: bool) { - let mut u = self.unreachable_nodes.lock().unwrap(); - if unreachable { - u.insert(id); + let v = if unreachable { + Some(RPCErrorType::Unreachable) + } else { + None + }; + self.set_rpc_failure(id, NetRecv, v); + self.set_rpc_failure(id, NetSend, v); + } + + /// Set whether to emit a specified rpc error when sending to/receiving from a node. + fn set_rpc_failure(&self, id: MemNodeId, dir: Direction, rpc_error_type: Option) { + let mut fails = self.fail_rpc.lock().unwrap(); + if let Some(rpc_error_type) = rpc_error_type { + fails.insert((id, dir), rpc_error_type); } else { - u.remove(&id); + fails.remove(&(id, dir)); } } @@ -551,33 +599,15 @@ impl TypedRaftRouter { /// Get the ID of the current leader. pub fn leader(&self) -> Option { - let isolated = { - let isolated = self.network_failure_nodes.lock().unwrap(); - isolated.clone() - }; - - tracing::debug!("router::leader: isolated: {:?}", isolated); - self.latest_metrics().into_iter().find_map(|node| { if node.current_leader == Some(node.id) { - if isolated.contains(&node.id) { - None - } else { - Some(node.id) - } + Some(node.id) } else { None } }) } - /// Restore the network of the specified node. - #[tracing::instrument(level = "debug", skip(self))] - pub fn restore_node(&self, id: MemNodeId) { - let mut nodes = self.network_failure_nodes.lock().unwrap(); - nodes.remove(&id); - } - /// Bring up a new learner and add it to the leader's membership. pub async fn add_learner( &self, @@ -815,24 +845,20 @@ impl TypedRaftRouter { } #[tracing::instrument(level = "debug", skip(self))] - pub fn check_network_error(&self, id: MemNodeId, target: MemNodeId) -> Result<(), NetworkError> { - let isolated = self.network_failure_nodes.lock().unwrap(); - - if isolated.contains(&target) || isolated.contains(&id) { - let network_err = NetworkError::new(&AnyError::error(format!("isolated:{} -> {}", id, target))); - return Err(network_err); - } - - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self))] - pub fn check_unreachable(&self, id: MemNodeId, target: MemNodeId) -> Result<(), Unreachable> { - let unreachable = self.unreachable_nodes.lock().unwrap(); + pub fn emit_rpc_error( + &self, + id: MemNodeId, + target: MemNodeId, + ) -> Result<(), RPCError>> + where + E: std::error::Error, + { + let fails = self.fail_rpc.lock().unwrap(); - if unreachable.contains(&target) || unreachable.contains(&id) { - let err = Unreachable::new(&AnyError::error(format!("unreachable:{} -> {}", id, target))); - return Err(err); + for key in [(id, NetSend), (target, NetRecv)] { + if let Some(err_type) = fails.get(&key) { + return Err(err_type.make_error(key.0, key.1)); + } } Ok(()) @@ -864,10 +890,11 @@ impl RaftNetwork for RaftRouterNetwork { mut rpc: AppendEntriesRequest, ) -> Result, RPCError>> { tracing::debug!("append_entries to id={} {}", self.target, rpc.summary()); - self.owner.count_rpc(RPCType::AppendEntries); + self.owner.count_rpc(RPCTypes::AppendEntries); + + let from_id = rpc.vote.leader_id().voted_for().unwrap(); + self.owner.emit_rpc_error(from_id, self.target)?; - self.owner.check_network_error(rpc.vote.leader_id().voted_for().unwrap(), self.target)?; - self.owner.check_unreachable(rpc.vote.leader_id().voted_for().unwrap(), self.target)?; self.owner.rand_send_delay().await; // decrease quota if quota is set @@ -926,10 +953,11 @@ impl RaftNetwork for RaftRouterNetwork { rpc: InstallSnapshotRequest, ) -> Result, RPCError>> { - self.owner.count_rpc(RPCType::InstallSnapshot); + self.owner.count_rpc(RPCTypes::InstallSnapshot); + + let from_id = rpc.vote.leader_id().voted_for().unwrap(); + self.owner.emit_rpc_error(from_id, self.target)?; - self.owner.check_network_error(rpc.vote.leader_id().voted_for().unwrap(), self.target)?; - self.owner.check_unreachable(rpc.vote.leader_id().voted_for().unwrap(), self.target)?; self.owner.rand_send_delay().await; let node = self.owner.get_raft_handle(&self.target)?; @@ -945,10 +973,11 @@ impl RaftNetwork for RaftRouterNetwork { &mut self, rpc: VoteRequest, ) -> Result, RPCError>> { - self.owner.count_rpc(RPCType::Vote); + self.owner.count_rpc(RPCTypes::Vote); + + let from_id = rpc.vote.leader_id().voted_for().unwrap(); + self.owner.emit_rpc_error(from_id, self.target)?; - self.owner.check_network_error(rpc.vote.leader_id().voted_for().unwrap(), self.target)?; - self.owner.check_unreachable(rpc.vote.leader_id().voted_for().unwrap(), self.target)?; self.owner.rand_send_delay().await; let node = self.owner.get_raft_handle(&self.target)?; diff --git a/tests/tests/membership/t11_add_learner.rs b/tests/tests/membership/t11_add_learner.rs index d193bef62..c57f19f3d 100644 --- a/tests/tests/membership/t11_add_learner.rs +++ b/tests/tests/membership/t11_add_learner.rs @@ -127,7 +127,7 @@ async fn add_learner_non_blocking() -> Result<()> { router.new_raft_node(1).await; // Replication problem should not block adding-learner in non-blocking mode. - router.set_node_network_failure(1, true); + router.set_network_error(1, true); let raft = router.get_raft_handle(&0)?; raft.add_learner(1, (), false).await?; @@ -208,7 +208,7 @@ async fn add_learner_when_previous_membership_not_committed() -> Result<()> { tracing::info!(log_index, "--- block replication to prevent committing any log"); { - router.set_node_network_failure(1, true); + router.set_network_error(1, true); let node = router.get_raft_handle(&0)?; tokio::spawn(async move { diff --git a/tests/tests/membership/t30_commit_joint_config.rs b/tests/tests/membership/t30_commit_joint_config.rs index c14afa22a..55128fd0c 100644 --- a/tests/tests/membership/t30_commit_joint_config.rs +++ b/tests/tests/membership/t30_commit_joint_config.rs @@ -59,8 +59,8 @@ async fn commit_joint_config_during_0_to_012() -> Result<()> { "--- isolate node 1,2, so that membership [0,1,2] wont commit" ); - router.set_node_network_failure(1, true); - router.set_node_network_failure(2, true); + router.set_network_error(1, true); + router.set_network_error(2, true); tracing::info!(log_index, "--- changing cluster config, should timeout"); @@ -110,8 +110,8 @@ async fn commit_joint_config_during_012_to_234() -> Result<()> { tracing::info!(log_index, "--- isolate 3,4"); - router.set_node_network_failure(3, true); - router.set_node_network_failure(4, true); + router.set_network_error(3, true); + router.set_network_error(4, true); tracing::info!(log_index, "--- changing config to 0,1,2"); let node = router.get_raft_handle(&0)?; diff --git a/tests/tests/membership/t30_elect_with_new_config.rs b/tests/tests/membership/t30_elect_with_new_config.rs index 630d45020..3d3c99d08 100644 --- a/tests/tests/membership/t30_elect_with_new_config.rs +++ b/tests/tests/membership/t30_elect_with_new_config.rs @@ -38,7 +38,7 @@ async fn leader_election_after_changing_0_to_01234() -> Result<()> { // Isolate old leader and assert that a new leader takes over. tracing::info!(log_index, "--- isolating leader node 0"); - router.set_node_network_failure(0, true); + router.set_network_error(0, true); // Wait for leader lease to expire sleep(Duration::from_millis(700)).await; @@ -62,7 +62,7 @@ async fn leader_election_after_changing_0_to_01234() -> Result<()> { let leader_id = 1; tracing::info!(log_index, "--- restore node 0, log_index:{}", log_index); - router.set_node_network_failure(0, false); + router.set_network_error(0, false); router .wait(&0, timeout()) .metrics( diff --git a/tests/tests/membership/t51_remove_unreachable_follower.rs b/tests/tests/membership/t51_remove_unreachable_follower.rs index 41b567052..9d43e810b 100644 --- a/tests/tests/membership/t51_remove_unreachable_follower.rs +++ b/tests/tests/membership/t51_remove_unreachable_follower.rs @@ -27,7 +27,7 @@ async fn stop_replication_to_removed_unreachable_follower_network_failure() -> R tracing::info!(log_index, "--- isolate node 4"); { - router.set_node_network_failure(4, true); + router.set_network_error(4, true); } // logs on node 4 will stop here: @@ -74,7 +74,7 @@ async fn stop_replication_to_removed_unreachable_follower_network_failure() -> R "--- restore network isolation, node 4 won't catch up log and will enter candidate state" ); { - router.set_node_network_failure(4, false); + router.set_network_error(4, false); router .wait(&4, timeout()) diff --git a/tests/tests/replication/t50_append_entries_backoff.rs b/tests/tests/replication/t50_append_entries_backoff.rs index e7333d504..9e6da0ee8 100644 --- a/tests/tests/replication/t50_append_entries_backoff.rs +++ b/tests/tests/replication/t50_append_entries_backoff.rs @@ -4,9 +4,9 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; use openraft::Config; +use openraft::RPCTypes; use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::RPCType; use crate::fixtures::RaftRouter; /// Append-entries should backoff when a `Unreachable` error is found. @@ -42,8 +42,8 @@ async fn append_entries_backoff() -> Result<()> { let counts1 = router.get_rpc_count(); - let c0 = *counts0.get(&RPCType::AppendEntries).unwrap_or(&0); - let c1 = *counts1.get(&RPCType::AppendEntries).unwrap_or(&0); + let c0 = *counts0.get(&RPCTypes::AppendEntries).unwrap_or(&0); + let c1 = *counts1.get(&RPCTypes::AppendEntries).unwrap_or(&0); // Without backoff, the leader would send about 40 append-entries RPC. // 20 for append log entries, 20 for updating committed. diff --git a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs index 766c53e67..74d4bee49 100644 --- a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs +++ b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs @@ -61,7 +61,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { // Learner: 0..10 tracing::info!(log_index, "--- block replication, build another snapshot"); { - router.set_node_network_failure(1, true); + router.set_network_error(1, true); log_index += router.client_request_many(0, "0", 5).await?; router.wait(&0, timeout()).log(Some(log_index), "write another 5 logs").await?; @@ -85,7 +85,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { "--- restore replication, install the 2nd snapshot on learner" ); { - router.set_node_network_failure(1, false); + router.set_network_error(1, false); learner .wait(timeout()) diff --git a/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs index fffcf9a23..483d03f7f 100644 --- a/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs +++ b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs @@ -39,8 +39,8 @@ async fn replication_does_not_block_purge() -> Result<()> { let leader = router.get_raft_handle(&0)?; - router.set_node_network_failure(1, true); - router.set_node_network_failure(2, true); + router.set_network_error(1, true); + router.set_network_error(2, true); tracing::info!(log_index, "--- build snapshot on leader, check purged log"); { diff --git a/tests/tests/snapshot_streaming/t50_snapshot_line_rate_to_snapshot.rs b/tests/tests/snapshot_streaming/t50_snapshot_line_rate_to_snapshot.rs index b208086aa..d520aa07b 100644 --- a/tests/tests/snapshot_streaming/t50_snapshot_line_rate_to_snapshot.rs +++ b/tests/tests/snapshot_streaming/t50_snapshot_line_rate_to_snapshot.rs @@ -56,7 +56,7 @@ async fn snapshot_line_rate_to_snapshot() -> Result<()> { tracing::info!(log_index, "--- stop replication to node 1"); tracing::info!(log_index, "--- send just enough logs to trigger snapshot"); { - router.set_node_network_failure(1, true); + router.set_network_error(1, true); router.client_request_many(0, "0", (snapshot_threshold - 1 - log_index) as usize).await?; log_index = snapshot_threshold - 1; @@ -81,7 +81,7 @@ async fn snapshot_line_rate_to_snapshot() -> Result<()> { tracing::info!(log_index, "--- restore node 1 and replication"); { - router.set_node_network_failure(1, false); + router.set_network_error(1, false); router.wait_for_log(&btreeset![1], Some(log_index), timeout(), "replicate by snapshot").await?; router diff --git a/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs b/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs index bb38d5bd7..082e394c6 100644 --- a/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs +++ b/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs @@ -27,7 +27,7 @@ async fn snapshot_to_unreachable_node_should_not_block() -> Result<()> { let mut log_index = router.new_cluster(btreeset! {0,1}, btreeset! {2}).await?; tracing::info!(log_index, "--- isolate replication 0 -> 2"); - router.set_node_network_failure(2, true); + router.set_network_error(2, true); let n = 10; tracing::info!(log_index, "--- write {} logs", n);