diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 9345a1012..8a17aa944 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -96,6 +96,7 @@ use crate::type_config::TypeConfigExt; use crate::vote::vote_status::VoteStatus; use crate::vote::CommittedVote; use crate::vote::NonCommittedVote; +use crate::vote::RaftLeaderId; use crate::ChangeMembers; use crate::Instant; use crate::LogId; @@ -583,7 +584,7 @@ where id: self.id.clone(), // --- data --- - current_term: st.vote_ref().leader_id().get_term(), + current_term: st.vote_ref().leader_id().term(), vote: st.io_state().io_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(), last_log_index: st.last_log_id().index(), last_applied: st.io_applied().cloned(), @@ -726,7 +727,7 @@ where } // Safe unwrap(): vote that is committed has to already have voted for some node. - let id = vote.leader_id().voted_for().unwrap(); + let id = vote.leader_id().node_id_ref().cloned().unwrap(); // TODO: `is_voter()` is slow, maybe cache `current_leader`, // e.g., only update it when membership or vote changes diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index 828163925..8a809d063 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -180,6 +180,7 @@ mod tests { type NodeId = u64; type Node = (); type Term = u64; + type LeaderId = crate::impls::LeaderId; type Entry = crate::Entry; type SnapshotData = Cursor>; type AsyncRuntime = TokioRuntime; diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index b881d2880..9366e2344 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -48,6 +48,7 @@ use crate::type_config::alias::ResponderOf; use crate::type_config::alias::SnapshotDataOf; use crate::type_config::TypeConfigExt; use crate::vote::raft_term::RaftTerm; +use crate::vote::RaftLeaderId; use crate::LogId; use crate::LogIdOptionExt; use crate::Membership; @@ -209,7 +210,7 @@ where C: RaftTypeConfig /// Start to elect this node as leader #[tracing::instrument(level = "debug", skip(self))] pub(crate) fn elect(&mut self) { - let new_term = self.state.vote.leader_id().term.next(); + let new_term = self.state.vote.leader_id().term().next(); let new_vote = Vote::new(new_term, self.config.id.clone()); let candidate = self.new_candidate(new_vote.clone()); diff --git a/openraft/src/engine/handler/establish_handler/mod.rs b/openraft/src/engine/handler/establish_handler/mod.rs index 7506209ff..3cf3879fb 100644 --- a/openraft/src/engine/handler/establish_handler/mod.rs +++ b/openraft/src/engine/handler/establish_handler/mod.rs @@ -3,6 +3,7 @@ use crate::proposer::Candidate; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::proposer::LeaderState; +use crate::vote::RaftLeaderId; use crate::RaftTypeConfig; /// Establish a leader for the Engine, when Candidate finishes voting stage. @@ -24,8 +25,8 @@ where C: RaftTypeConfig let vote = candidate.vote_ref().clone(); debug_assert_eq!( - vote.leader_id().voted_for(), - Some(self.config.id.clone()), + vote.leader_id().node_id_ref(), + Some(&self.config.id), "it can only commit its own vote" ); diff --git a/openraft/src/engine/handler/leader_handler/append_entries_test.rs b/openraft/src/engine/handler/leader_handler/append_entries_test.rs index 36cf68aeb..18f6a8408 100644 --- a/openraft/src/engine/handler/leader_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/leader_handler/append_entries_test.rs @@ -23,10 +23,8 @@ use crate::testing::blank_ent; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; -use crate::vote::CommittedLeaderId; use crate::EffectiveMembership; use crate::Entry; -use crate::LogId; use crate::Membership; use crate::MembershipState; use crate::Vote; @@ -131,10 +129,7 @@ fn test_leader_append_entries_normal() -> anyhow::Result<()> { ], eng.state.log_ids.key_log_ids() ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), - eng.state.last_log_id() - ); + assert_eq!(Some(&log_id(3, 1, 6)), eng.state.last_log_id()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1, 1)), m01())), @@ -258,10 +253,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> { ], eng.state.log_ids.key_log_ids() ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), - eng.state.last_log_id() - ); + assert_eq!(Some(&log_id(3, 1, 6)), eng.state.last_log_id()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(2, 1, 3)), m1())), diff --git a/openraft/src/engine/handler/log_handler/calc_purge_upto_test.rs b/openraft/src/engine/handler/log_handler/calc_purge_upto_test.rs index fb03e2c44..ca98c730a 100644 --- a/openraft/src/engine/handler/log_handler/calc_purge_upto_test.rs +++ b/openraft/src/engine/handler/log_handler/calc_purge_upto_test.rs @@ -1,12 +1,13 @@ use crate::engine::testing::UTConfig; use crate::engine::Engine; use crate::engine::LogIdList; -use crate::CommittedLeaderId; +use crate::type_config::alias::LeaderIdOf; +use crate::vote::RaftLeaderIdExt; use crate::LogId; fn log_id(term: u64, index: u64) -> LogId { LogId { - leader_id: CommittedLeaderId::new(term, 0), + leader_id: LeaderIdOf::::new_committed(term, 0), index, } } diff --git a/openraft/src/engine/handler/replication_handler/append_membership_test.rs b/openraft/src/engine/handler/replication_handler/append_membership_test.rs index fa4254d4d..a2dd35256 100644 --- a/openraft/src/engine/handler/replication_handler/append_membership_test.rs +++ b/openraft/src/engine/handler/replication_handler/append_membership_test.rs @@ -16,9 +16,7 @@ use crate::progress::Progress; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; -use crate::CommittedLeaderId; use crate::EffectiveMembership; -use crate::LogId; use crate::Membership; use crate::MembershipState; use crate::Vote; @@ -110,11 +108,7 @@ fn test_leader_append_membership_update_learner_process() -> anyhow::Result<()> // learner or vice versa. let mut eng = eng(); - eng.state.log_ids = LogIdList::new([ - LogId::new(CommittedLeaderId::new(0, 0), 0), - log_id(1, 1, 1), - log_id(5, 1, 10), - ]); + eng.state.log_ids = LogIdList::new([log_id(0, 0, 0), log_id(1, 1, 1), log_id(5, 1, 10)]); eng.state .membership_state diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 7da6c0b51..b936075cb 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -18,6 +18,7 @@ use crate::proposer::LeaderState; use crate::raft_state::IOId; use crate::raft_state::LogStateReader; use crate::type_config::TypeConfigExt; +use crate::vote::RaftLeaderId; use crate::LogId; use crate::OptionalSend; use crate::RaftState; @@ -220,7 +221,7 @@ where C: RaftTypeConfig /// This node then becomes raft-follower or raft-learner. pub(crate) fn become_following(&mut self) { debug_assert!( - self.state.vote_ref().leader_id().voted_for().as_ref() != Some(&self.config.id) + self.state.vote_ref().leader_id().node_id_ref() != Some(&self.config.id) || !self.state.membership_state.effective().membership().is_voter(&self.config.id), "It must hold: vote is not mine, or I am not a voter(leader just left the cluster)" ); diff --git a/openraft/src/engine/testing.rs b/openraft/src/engine/testing.rs index 0f816ef54..8de27b978 100644 --- a/openraft/src/engine/testing.rs +++ b/openraft/src/engine/testing.rs @@ -37,6 +37,7 @@ where N: Node + Ord type Node = N; type Entry = crate::Entry; type Term = u64; + type LeaderId = crate::impls::LeaderId; type SnapshotData = Cursor>; type AsyncRuntime = TokioRuntime; type Responder = crate::impls::OneshotResponder; diff --git a/openraft/src/engine/tests/elect_test.rs b/openraft/src/engine/tests/elect_test.rs index 25f8bd2fe..a9fceb30a 100644 --- a/openraft/src/engine/tests/elect_test.rs +++ b/openraft/src/engine/tests/elect_test.rs @@ -14,9 +14,7 @@ use crate::raft::VoteRequest; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; -use crate::CommittedLeaderId; use crate::EffectiveMembership; -use crate::LogId; use crate::Membership; use crate::Vote; @@ -30,7 +28,7 @@ fn m12() -> Membership { fn eng() -> Engine { let mut eng = Engine::testing_default(0); - eng.state.log_ids = LogIdList::new([LogId::new(CommittedLeaderId::new(0, 0), 0)]); + eng.state.log_ids = LogIdList::new([log_id(0, 0, 0)]); eng.state.enable_validation(false); // Disable validation for incomplete state eng } diff --git a/openraft/src/engine/tests/handle_vote_resp_test.rs b/openraft/src/engine/tests/handle_vote_resp_test.rs index e76efcad7..ce22fee8d 100644 --- a/openraft/src/engine/tests/handle_vote_resp_test.rs +++ b/openraft/src/engine/tests/handle_vote_resp_test.rs @@ -20,10 +20,8 @@ use crate::replication::request::Replicate; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; -use crate::CommittedLeaderId; use crate::EffectiveMembership; use crate::Entry; -use crate::LogId; use crate::Membership; use crate::Vote; @@ -39,7 +37,7 @@ fn eng() -> Engine { let mut eng = Engine::testing_default(0); eng.state.enable_validation(false); // Disable validation for incomplete state - eng.state.log_ids = LogIdList::new([LogId::new(CommittedLeaderId::new(0, 0), 0)]); + eng.state.log_ids = LogIdList::new([log_id(0, 0, 0)]); eng } diff --git a/openraft/src/engine/tests/initialize_test.rs b/openraft/src/engine/tests/initialize_test.rs index a80d7bab1..3911d51ed 100644 --- a/openraft/src/engine/tests/initialize_test.rs +++ b/openraft/src/engine/tests/initialize_test.rs @@ -17,7 +17,6 @@ use crate::raft_state::LogStateReader; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; -use crate::vote::CommittedLeaderId; use crate::Entry; use crate::LogId; use crate::Membership; @@ -33,10 +32,7 @@ fn test_initialize_single_node() -> anyhow::Result<()> { eng }; - let log_id0 = LogId { - leader_id: CommittedLeaderId::new(0, 0), - index: 0, - }; + let log_id0 = log_id(0, 0, 0); let m1 = || Membership::::new_with_defaults(vec![btreeset! {1}], []); let entry = Entry::::new_membership(LogId::default(), m1()); @@ -86,10 +82,7 @@ fn test_initialize() -> anyhow::Result<()> { eng }; - let log_id0 = LogId { - leader_id: CommittedLeaderId::new(0, 0), - index: 0, - }; + let log_id0 = log_id(0, 0, 0); let m12 = || Membership::::new_with_defaults(vec![btreeset! {1,2}], []); let entry = || Entry::::new_membership(LogId::default(), m12()); @@ -122,10 +115,7 @@ fn test_initialize() -> anyhow::Result<()> { Command::SendVote { vote_req: VoteRequest { vote: Vote::new(1, 1), - last_log_id: Some(LogId { - leader_id: CommittedLeaderId::new(0, 0), - index: 0, - },), + last_log_id: Some(log_id(0, 0, 0)) }, }, ], diff --git a/openraft/src/engine/tests/trigger_purge_log_test.rs b/openraft/src/engine/tests/trigger_purge_log_test.rs index 070f0a952..db99a98a7 100644 --- a/openraft/src/engine/tests/trigger_purge_log_test.rs +++ b/openraft/src/engine/tests/trigger_purge_log_test.rs @@ -13,9 +13,7 @@ use crate::storage::SnapshotMeta; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; -use crate::CommittedLeaderId; use crate::EffectiveMembership; -use crate::LogId; use crate::Membership; use crate::MembershipState; use crate::StoredMembership; @@ -33,7 +31,7 @@ fn eng() -> Engine { EffectiveMembership::new_arc(Some(log_id(1, 0, 1)), m12()), ); - eng.state.log_ids = LogIdList::new([LogId::new(CommittedLeaderId::new(0, 0), 0)]); + eng.state.log_ids = LogIdList::new([log_id(0, 0, 0)]); eng } diff --git a/openraft/src/impls/mod.rs b/openraft/src/impls/mod.rs index 1ade60387..b446525e2 100644 --- a/openraft/src/impls/mod.rs +++ b/openraft/src/impls/mod.rs @@ -6,3 +6,16 @@ pub use crate::node::EmptyNode; pub use crate::raft::responder::impls::OneshotResponder; #[cfg(feature = "tokio-rt")] pub use crate::type_config::async_runtime::tokio_impls::TokioRuntime; + +#[cfg(not(feature = "single-term-leader"))] +pub mod leader_id { + pub use crate::vote::leader_id::leader_id_adv::CommittedLeaderId; + pub use crate::vote::leader_id::leader_id_adv::LeaderId; +} +#[cfg(feature = "single-term-leader")] +pub mod leader_id { + pub use crate::vote::leader_id::leader_id_std::CommittedLeaderId; + pub use crate::vote::leader_id::leader_id_std::LeaderId; +} + +pub use leader_id::LeaderId; diff --git a/openraft/src/log_id/mod.rs b/openraft/src/log_id/mod.rs index 5220a0a53..42070a028 100644 --- a/openraft/src/log_id/mod.rs +++ b/openraft/src/log_id/mod.rs @@ -12,7 +12,7 @@ pub use log_id_option_ext::LogIdOptionExt; pub use log_index_option_ext::LogIndexOptionExt; pub use raft_log_id::RaftLogId; -use crate::CommittedLeaderId; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::RaftTypeConfig; /// The identity of a raft log. @@ -25,7 +25,7 @@ pub struct LogId where C: RaftTypeConfig { /// The id of the leader that proposed this log - pub leader_id: CommittedLeaderId, + pub leader_id: CommittedLeaderIdOf, /// The index of a log in the storage. /// /// Log index is a consecutive integer. @@ -35,7 +35,7 @@ where C: RaftTypeConfig impl Copy for LogId where C: RaftTypeConfig, - C::NodeId: Copy, + CommittedLeaderIdOf: Copy, { } @@ -63,12 +63,12 @@ impl LogId where C: RaftTypeConfig { /// Creates a log id proposed by a committed leader with `leader_id` at the given index. - pub fn new(leader_id: CommittedLeaderId, index: u64) -> Self { + pub fn new(leader_id: CommittedLeaderIdOf, index: u64) -> Self { LogId { leader_id, index } } /// Returns the leader id that proposed this log. - pub fn committed_leader_id(&self) -> &CommittedLeaderId { + pub fn committed_leader_id(&self) -> &CommittedLeaderIdOf { &self.leader_id } } diff --git a/openraft/src/log_id/raft_log_id.rs b/openraft/src/log_id/raft_log_id.rs index c2d932a60..06346b33c 100644 --- a/openraft/src/log_id/raft_log_id.rs +++ b/openraft/src/log_id/raft_log_id.rs @@ -1,4 +1,4 @@ -use crate::CommittedLeaderId; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::LogId; use crate::RaftTypeConfig; @@ -12,7 +12,7 @@ where C: RaftTypeConfig /// For example, a leader id in standard raft is `(term, node_id)`, but a log id does not have /// to store the `node_id`, because in standard raft there is at most one leader that can be /// established. - fn leader_id(&self) -> &CommittedLeaderId { + fn leader_id(&self) -> &CommittedLeaderIdOf { self.get_log_id().committed_leader_id() } diff --git a/openraft/src/log_id_range.rs b/openraft/src/log_id_range.rs index 5a1f6ed0a..ef7b01ea4 100644 --- a/openraft/src/log_id_range.rs +++ b/openraft/src/log_id_range.rs @@ -5,6 +5,7 @@ use std::fmt::Formatter; use validit::Validate; use crate::display_ext::DisplayOptionExt; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::LogId; use crate::LogIdOptionExt; use crate::RaftTypeConfig; @@ -29,7 +30,7 @@ where C: RaftTypeConfig impl Copy for LogIdRange where C: RaftTypeConfig, - C::NodeId: Copy, + CommittedLeaderIdOf: Copy, { } @@ -69,14 +70,11 @@ mod tests { use crate::engine::testing::UTConfig; use crate::log_id_range::LogIdRange; - use crate::CommittedLeaderId; + use crate::testing; use crate::LogId; fn log_id(index: u64) -> LogId { - LogId { - leader_id: CommittedLeaderId::new(1, 1), - index, - } + testing::log_id(1, 1, index) } #[test] diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index f00fb7d04..83d291954 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -13,8 +13,6 @@ use crate::testing::log_id; use crate::type_config::alias::NodeIdOf; use crate::type_config::alias::WatchSenderOf; use crate::type_config::TypeConfigExt; -use crate::vote::CommittedLeaderId; -use crate::LogId; use crate::Membership; use crate::RaftMetrics; use crate::RaftTypeConfig; @@ -48,7 +46,7 @@ async fn test_wait() -> anyhow::Result<()> { sleep(Duration::from_millis(10)).await; let mut update = init.clone(); update.last_log_index = Some(3); - update.last_applied = Some(LogId::new(CommittedLeaderId::new(1, 0), 3)); + update.last_applied = Some(log_id(1, 0, 3)); let rst = tx.send(update); assert!(rst.is_ok()); }); @@ -115,14 +113,14 @@ async fn test_wait() -> anyhow::Result<()> { let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; let mut update = init.clone(); - update.snapshot = Some(LogId::new(CommittedLeaderId::new(1, 0), 2)); + update.snapshot = Some(log_id(1, 0, 2)); let rst = tx.send(update); assert!(rst.is_ok()); }); - let got = w.snapshot(LogId::new(CommittedLeaderId::new(1, 0), 2), "snapshot").await?; + let got = w.snapshot(log_id(1, 0, 2), "snapshot").await?; h.await?; - assert_eq!(Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), got.snapshot); + assert_eq!(Some(log_id(1, 0, 2)), got.snapshot); } tracing::info!("--- wait for snapshot, only index matches"); @@ -132,13 +130,13 @@ async fn test_wait() -> anyhow::Result<()> { let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; let mut update = init.clone(); - update.snapshot = Some(LogId::new(CommittedLeaderId::new(3, 0), 2)); + update.snapshot = Some(log_id(3, 0, 2)); let rst = tx.send(update); assert!(rst.is_ok()); // delay otherwise the channel will be closed thus the error is shutdown. sleep(Duration::from_millis(200)).await; }); - let got = w.snapshot(LogId::new(CommittedLeaderId::new(1, 0), 2), "snapshot").await; + let got = w.snapshot(log_id(1, 0, 2), "snapshot").await; h.await?; match got.unwrap_err() { WaitError::Timeout(t, _) => { diff --git a/openraft/src/progress/entry/tests.rs b/openraft/src/progress/entry/tests.rs index 9ab6ae960..d7e99384a 100644 --- a/openraft/src/progress/entry/tests.rs +++ b/openraft/src/progress/entry/tests.rs @@ -5,12 +5,13 @@ use crate::engine::EngineConfig; use crate::progress::entry::ProgressEntry; use crate::progress::inflight::Inflight; use crate::raft_state::LogStateReader; -use crate::CommittedLeaderId; +use crate::type_config::alias::LeaderIdOf; +use crate::vote::RaftLeaderIdExt; use crate::LogId; fn log_id(index: u64) -> LogId { LogId { - leader_id: CommittedLeaderId::new(1, 1), + leader_id: LeaderIdOf::::new_committed(1, 1), index, } } diff --git a/openraft/src/progress/inflight/mod.rs b/openraft/src/progress/inflight/mod.rs index 73112e0b0..aca65f7d4 100644 --- a/openraft/src/progress/inflight/mod.rs +++ b/openraft/src/progress/inflight/mod.rs @@ -9,6 +9,7 @@ use validit::Validate; use crate::display_ext::DisplayOptionExt; use crate::log_id_range::LogIdRange; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::LogId; use crate::LogIdOptionExt; use crate::RaftTypeConfig; @@ -41,7 +42,7 @@ where C: RaftTypeConfig impl Copy for Inflight where C: RaftTypeConfig, - C::NodeId: Copy, + CommittedLeaderIdOf: Copy, { } diff --git a/openraft/src/progress/inflight/tests.rs b/openraft/src/progress/inflight/tests.rs index 6bf7bd1fc..04ffe982a 100644 --- a/openraft/src/progress/inflight/tests.rs +++ b/openraft/src/progress/inflight/tests.rs @@ -3,12 +3,13 @@ use validit::Validate; use crate::engine::testing::UTConfig; use crate::log_id_range::LogIdRange; use crate::progress::Inflight; -use crate::CommittedLeaderId; +use crate::type_config::alias::LeaderIdOf; +use crate::vote::RaftLeaderIdExt; use crate::LogId; fn log_id(index: u64) -> LogId { LogId { - leader_id: CommittedLeaderId::new(1, 1), + leader_id: LeaderIdOf::::new_committed(1, 1), index, } } diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 3d206aa87..78bbc618a 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -10,6 +10,7 @@ use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; use crate::type_config::TypeConfigExt; use crate::vote::CommittedVote; +use crate::vote::RaftLeaderId; use crate::LogId; use crate::LogIdOptionExt; use crate::RaftLogId; @@ -202,7 +203,7 @@ where // Thus vote.voted_for() is this node. // Safe unwrap: voted_for() is always non-None in Openraft - let node_id = self.committed_vote.clone().into_vote().leader_id().voted_for().unwrap(); + let node_id = self.committed_vote.clone().into_vote().leader_id().node_id_ref().cloned().unwrap(); let now = C::now(); tracing::debug!( diff --git a/openraft/src/raft/declare_raft_types_test.rs b/openraft/src/raft/declare_raft_types_test.rs index fa13ff6b3..99e9e5bf3 100644 --- a/openraft/src/raft/declare_raft_types_test.rs +++ b/openraft/src/raft/declare_raft_types_test.rs @@ -43,16 +43,6 @@ declare_raft_types!( AsyncRuntime = TokioRuntime, ); -// This raise an compile error: -// > error: Type not in its expected position : NodeId = u64, D = (), types must present -// > in this order : D, R, NodeId, Node, Entry, SnapshotData, AsyncRuntime -// declare_raft_types!( -// Foo: -// Node = (), -// NodeId = u64, -// D = (), -// ); - declare_raft_types!(EmptyWithColon:); declare_raft_types!(Empty); diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 7ded9d116..2cb748a3d 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -115,6 +115,7 @@ use crate::Vote; /// NodeId = u64, /// Node = openraft::BasicNode, /// Term = u64, +/// LeaderId = openraft::impls::LeaderId, /// Entry = openraft::Entry, /// SnapshotData = Cursor>, /// Responder = openraft::impls::OneshotResponder, @@ -174,6 +175,7 @@ macro_rules! declare_raft_types { (NodeId , , u64 ), (Node , , $crate::impls::BasicNode ), (Term , , u64 ), + (LeaderId , , $crate::impls::LeaderId ), (Entry , , $crate::impls::Entry ), (SnapshotData , , std::io::Cursor> ), (Responder , , $crate::impls::OneshotResponder ), diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index c7a3d47a0..b234f04ab 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -41,6 +41,7 @@ use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; +use crate::vote::RaftLeaderId; /// A struct used to represent the raft state which a Raft node needs. #[derive(Clone, Debug)] @@ -367,7 +368,7 @@ where C: RaftTypeConfig /// /// [Determine Server State]: crate::docs::data::vote#vote-and-membership-define-the-server-state pub(crate) fn is_leading(&self, id: &C::NodeId) -> bool { - self.membership_state.contains(id) && self.vote.leader_id().voted_for().as_ref() == Some(id) + self.membership_state.contains(id) && self.vote.leader_id().node_id_ref() == Some(id) } /// The node is leader @@ -407,7 +408,7 @@ where C: RaftTypeConfig if vote.is_committed() { // Safe unwrap(): vote that is committed has to already have voted for some node. - let id = vote.leader_id().voted_for().unwrap(); + let id = vote.leader_id().node_id_ref().cloned().unwrap(); return self.new_forward_to_leader(id); } diff --git a/openraft/src/raft_state/tests/log_state_reader_test.rs b/openraft/src/raft_state/tests/log_state_reader_test.rs index 27ed5c9fd..6fc9675fc 100644 --- a/openraft/src/raft_state/tests/log_state_reader_test.rs +++ b/openraft/src/raft_state/tests/log_state_reader_test.rs @@ -1,15 +1,12 @@ use crate::engine::testing::UTConfig; use crate::engine::LogIdList; use crate::raft_state::LogStateReader; -use crate::CommittedLeaderId; +use crate::testing; use crate::LogId; use crate::RaftState; fn log_id(term: u64, index: u64) -> LogId { - LogId { - leader_id: CommittedLeaderId::new(term, 0), - index, - } + testing::log_id(term, 0, index) } #[test] diff --git a/openraft/src/raft_state/tests/validate_test.rs b/openraft/src/raft_state/tests/validate_test.rs index 7587da25a..c1b83a7c6 100644 --- a/openraft/src/raft_state/tests/validate_test.rs +++ b/openraft/src/raft_state/tests/validate_test.rs @@ -3,15 +3,12 @@ use validit::Validate; use crate::engine::testing::UTConfig; use crate::engine::LogIdList; use crate::storage::SnapshotMeta; -use crate::CommittedLeaderId; +use crate::testing; use crate::LogId; use crate::RaftState; fn log_id(term: u64, index: u64) -> LogId { - LogId { - leader_id: CommittedLeaderId::new(term, 0), - index, - } + testing::log_id(term, 0, index) } #[test] diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index bded915bb..773a2b545 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -56,6 +56,7 @@ use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; use crate::type_config::async_runtime::mutex::Mutex; use crate::type_config::TypeConfigExt; +use crate::vote::RaftLeaderId; use crate::LogId; use crate::RaftLogId; use crate::RaftNetworkFactory; @@ -442,7 +443,7 @@ where let append_res = res.map_err(|_e| { let to = Timeout { action: RPCTypes::AppendEntries, - id: self.session_id.vote().leader_id().voted_for().unwrap(), + id: self.session_id.vote().leader_id().node_id_ref().cloned().unwrap(), target: self.target.clone(), timeout: the_timeout, }; diff --git a/openraft/src/testing/common.rs b/openraft/src/testing/common.rs index 96a0295de..4ea87291e 100644 --- a/openraft/src/testing/common.rs +++ b/openraft/src/testing/common.rs @@ -3,7 +3,7 @@ use std::collections::BTreeSet; use crate::entry::RaftEntry; -use crate::CommittedLeaderId; +use crate::vote::RaftLeaderIdExt; use crate::LogId; use crate::RaftTypeConfig; @@ -14,7 +14,7 @@ where C::Term: From, { LogId:: { - leader_id: CommittedLeaderId::new(term.into(), node_id), + leader_id: C::LeaderId::new_committed(term.into(), node_id), index, } } diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index ab40d9579..cc067a4fb 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -24,7 +24,7 @@ use crate::storage::RaftStateMachine; use crate::storage::StorageHelper; use crate::testing::log::StoreBuilder; use crate::type_config::TypeConfigExt; -use crate::vote::CommittedLeaderId; +use crate::vote::RaftLeaderIdExt; use crate::LogId; use crate::Membership; use crate::OptionalSend; @@ -634,7 +634,7 @@ where pub async fn get_initial_state_log_ids(mut store: LS, mut sm: SM) -> Result<(), StorageError> { let log_id = |t: u64, n: u64, i| LogId:: { - leader_id: CommittedLeaderId::::new(t.into(), n.into()), + leader_id: C::LeaderId::new_committed(t.into(), n.into()), index: i, }; @@ -1379,7 +1379,7 @@ where C::NodeId: From, { LogId { - leader_id: CommittedLeaderId::new(term.into(), NODE_ID.into()), + leader_id: C::LeaderId::new_committed(term.into(), NODE_ID.into()), index, } } @@ -1463,7 +1463,7 @@ where C::NodeId: From, { LogId { - leader_id: CommittedLeaderId::new(term.into(), node_id.into()), + leader_id: C::LeaderId::new_committed(term.into(), node_id.into()), index, } } diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index f51b12f0f..c6152ec73 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -17,6 +17,7 @@ use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::raft::responder::Responder; use crate::vote::raft_term::RaftTerm; +use crate::vote::RaftLeaderId; use crate::AppData; use crate::AppDataResponse; use crate::Node; @@ -45,6 +46,7 @@ use crate::OptionalSync; /// NodeId = u64, /// Node = openraft::BasicNode, /// Term = u64, +/// LeaderId = openraft::impls::LeaderId, /// Entry = openraft::Entry, /// SnapshotData = Cursor>, /// AsyncRuntime = openraft::TokioRuntime, @@ -66,9 +68,6 @@ pub trait RaftTypeConfig: /// Raft application level node data type Node: Node; - /// Raft log entry, which can be built from an AppData. - type Entry: RaftEntry + FromAppData; - /// Type representing a Raft term number. /// /// A term is a logical clock in Raft that is used to detect obsolete information, @@ -79,6 +78,12 @@ pub trait RaftTypeConfig: /// See: [`RaftTerm`] for the required methods. type Term: RaftTerm; + /// A Leader identifier in a cluster. + type LeaderId: RaftLeaderId; + + /// Raft log entry, which can be built from an AppData. + type Entry: RaftEntry + FromAppData; + /// Snapshot data for exposing a snapshot for reading & writing. /// /// See the [storage chapter of the guide][sto] for details on log compaction / snapshotting. @@ -112,6 +117,7 @@ pub mod alias { use crate::async_runtime::Oneshot; use crate::raft::responder::Responder; use crate::type_config::AsyncRuntime; + use crate::vote::RaftLeaderId; use crate::RaftTypeConfig; pub type DOf = ::D; @@ -119,6 +125,7 @@ pub mod alias { pub type NodeIdOf = ::NodeId; pub type NodeOf = ::Node; pub type TermOf = ::Term; + pub type LeaderIdOf = ::LeaderId; pub type EntryOf = ::Entry; pub type SnapshotDataOf = ::SnapshotData; pub type AsyncRuntimeOf = ::AsyncRuntime; @@ -166,7 +173,6 @@ pub mod alias { // Usually used types pub type LogIdOf = crate::LogId; pub type VoteOf = crate::Vote; - pub type LeaderIdOf = crate::LeaderId; - pub type CommittedLeaderIdOf = crate::CommittedLeaderId; + pub type CommittedLeaderIdOf = as RaftLeaderId>::Committed; pub type SerdeInstantOf = crate::metrics::SerdeInstant>; } diff --git a/openraft/src/vote/committed.rs b/openraft/src/vote/committed.rs index 5fd5f7bc6..8d114161c 100644 --- a/openraft/src/vote/committed.rs +++ b/openraft/src/vote/committed.rs @@ -1,8 +1,9 @@ use std::cmp::Ordering; use std::fmt; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::vote::ref_vote::RefVote; -use crate::CommittedLeaderId; +use crate::vote::RaftLeaderId; use crate::RaftTypeConfig; use crate::Vote; @@ -41,7 +42,7 @@ where C: RaftTypeConfig Self { vote } } - pub(crate) fn committed_leader_id(&self) -> CommittedLeaderId { + pub(crate) fn committed_leader_id(&self) -> CommittedLeaderIdOf { self.vote.leader_id().to_committed() } diff --git a/openraft/src/vote/leader_id/impl_into_leader_id.rs b/openraft/src/vote/leader_id/impl_into_leader_id.rs deleted file mode 100644 index d4389c513..000000000 --- a/openraft/src/vote/leader_id/impl_into_leader_id.rs +++ /dev/null @@ -1,11 +0,0 @@ -use crate::vote::leader_id::CommittedLeaderId; -use crate::vote::Vote; -use crate::RaftTypeConfig; - -impl From> for CommittedLeaderId -where C: RaftTypeConfig -{ - fn from(vote: Vote) -> Self { - vote.leader_id.to_committed() - } -} diff --git a/openraft/src/vote/leader_id/leader_id_adv.rs b/openraft/src/vote/leader_id/leader_id_adv.rs index 6b95365de..5b2426b42 100644 --- a/openraft/src/vote/leader_id/leader_id_adv.rs +++ b/openraft/src/vote/leader_id/leader_id_adv.rs @@ -1,5 +1,7 @@ use std::fmt; +use crate::vote::RaftCommittedLeaderId; +use crate::vote::RaftLeaderId; use crate::RaftTypeConfig; /// LeaderId is identifier of a `leader`. @@ -21,34 +23,6 @@ where C: RaftTypeConfig pub node_id: C::NodeId, } -impl LeaderId -where C: RaftTypeConfig -{ - pub fn new(term: C::Term, node_id: C::NodeId) -> Self { - Self { term, node_id } - } - - pub fn get_term(&self) -> C::Term { - self.term - } - - pub fn voted_for(&self) -> Option { - Some(self.node_id.clone()) - } - - #[allow(clippy::wrong_self_convention)] - pub(crate) fn to_committed(&self) -> CommittedLeaderId { - self.clone() - } - - /// Return if it is the same leader as the committed leader id. - /// - /// A committed leader may have less info than a non-committed. - pub(crate) fn is_same_as_committed(&self, other: &CommittedLeaderId) -> bool { - self == other - } -} - impl fmt::Display for LeaderId where C: RaftTypeConfig { @@ -71,22 +45,49 @@ where C: RaftTypeConfig /// standard raft stores just a `term` in log entry. pub type CommittedLeaderId = LeaderId; +impl RaftLeaderId for LeaderId +where C: RaftTypeConfig +{ + type Committed = Self; + + fn new(term: C::Term, node_id: C::NodeId) -> Self { + Self { term, node_id } + } + + fn term(&self) -> C::Term { + self.term + } + + fn node_id_ref(&self) -> Option<&C::NodeId> { + Some(&self.node_id) + } + + fn to_committed(&self) -> Self::Committed { + self.clone() + } +} + +impl RaftCommittedLeaderId for LeaderId where C: RaftTypeConfig {} + #[cfg(test)] mod tests { use crate::engine::testing::UTConfig; + use crate::vote::RaftLeaderId; use crate::LeaderId; #[cfg(feature = "serde")] #[test] fn test_committed_leader_id_serde() -> anyhow::Result<()> { - use crate::CommittedLeaderId; + use crate::type_config::alias::CommittedLeaderIdOf; + use crate::type_config::alias::LeaderIdOf; + use crate::vote::RaftLeaderIdExt; - let c = CommittedLeaderId::::new(5, 10); + let c = LeaderIdOf::::new_committed(5, 10); let s = serde_json::to_string(&c)?; assert_eq!(r#"{"term":5,"node_id":10}"#, s); - let c2: CommittedLeaderId = serde_json::from_str(&s)?; - assert_eq!(CommittedLeaderId::new(5, 10), c2); + let c2: CommittedLeaderIdOf = serde_json::from_str(&s)?; + assert_eq!(LeaderIdOf::::new_committed(5, 10), c2); Ok(()) } diff --git a/openraft/src/vote/leader_id/leader_id_std.rs b/openraft/src/vote/leader_id/leader_id_std.rs index 95d7e7ab2..e3cbfaf4f 100644 --- a/openraft/src/vote/leader_id/leader_id_std.rs +++ b/openraft/src/vote/leader_id/leader_id_std.rs @@ -3,6 +3,8 @@ use std::fmt; use std::marker::PhantomData; use crate::display_ext::DisplayOptionExt; +use crate::vote::RaftCommittedLeaderId; +use crate::vote::RaftLeaderId; use crate::RaftTypeConfig; #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] @@ -41,43 +43,37 @@ where C: RaftTypeConfig } } -impl LeaderId +impl fmt::Display for LeaderId where C: RaftTypeConfig { - pub fn new(term: C::Term, node_id: C::NodeId) -> Self { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "T{}-N{}", self.term, self.voted_for.display()) + } +} + +impl RaftLeaderId for LeaderId +where C: RaftTypeConfig +{ + type Committed = CommittedLeaderId; + + fn new(term: C::Term, node_id: C::NodeId) -> Self { Self { term, voted_for: Some(node_id), } } - pub fn get_term(&self) -> C::Term { + fn term(&self) -> C::Term { self.term } - pub fn voted_for(&self) -> Option { - self.voted_for.clone() + fn node_id_ref(&self) -> Option<&C::NodeId> { + self.voted_for.as_ref() } - #[allow(clippy::wrong_self_convention)] - pub(crate) fn to_committed(&self) -> CommittedLeaderId { + fn to_committed(&self) -> Self::Committed { CommittedLeaderId::new(self.term, C::NodeId::default()) } - - /// Return if it is the same leader as the committed leader id. - /// - /// A committed leader may have less info than a non-committed. - pub(crate) fn is_same_as_committed(&self, other: &CommittedLeaderId) -> bool { - self.term == other.term - } -} - -impl fmt::Display for LeaderId -where C: RaftTypeConfig -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "T{}-N{}", self.term, self.voted_for.display()) - } } #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] @@ -108,6 +104,8 @@ where C: RaftTypeConfig } } +impl RaftCommittedLeaderId for CommittedLeaderId where C: RaftTypeConfig {} + #[cfg(test)] #[allow(clippy::nonminimal_bool)] mod tests { diff --git a/openraft/src/vote/leader_id/mod.rs b/openraft/src/vote/leader_id/mod.rs index 38bc0b056..10b07d698 100644 --- a/openraft/src/vote/leader_id/mod.rs +++ b/openraft/src/vote/leader_id/mod.rs @@ -1,7 +1,7 @@ #[cfg(not(feature = "single-term-leader"))] -mod leader_id_adv; +pub(crate) mod leader_id_adv; #[cfg(feature = "single-term-leader")] -mod leader_id_std; +pub(crate) mod leader_id_std; #[cfg(not(feature = "single-term-leader"))] pub use leader_id_adv::CommittedLeaderId; @@ -12,4 +12,5 @@ pub use leader_id_std::CommittedLeaderId; #[cfg(feature = "single-term-leader")] pub use leader_id_std::LeaderId; -mod impl_into_leader_id; +pub(crate) mod raft_committed_leader_id; +pub(crate) mod raft_leader_id; diff --git a/openraft/src/vote/leader_id/raft_committed_leader_id.rs b/openraft/src/vote/leader_id/raft_committed_leader_id.rs new file mode 100644 index 000000000..d597f861a --- /dev/null +++ b/openraft/src/vote/leader_id/raft_committed_leader_id.rs @@ -0,0 +1,45 @@ +use std::fmt::Debug; +use std::fmt::Display; + +use crate::base::OptionalFeatures; +use crate::RaftTypeConfig; + +/// A Leader identifier that has been granted and committed by a quorum of the cluster. +/// +/// This type is used as part of the Log ID to identify the leader that proposed a log entry. +/// Because log can only be proposed by a Leader committed by a quorum. +/// For example, in standard Raft, committed LeaderId is just the term number, because in each term +/// there is only one established leader. +/// +/// # Implementation +/// +/// A simple non-optimized implementation of this trait is to use the same type as [`RaftLeaderId`]. +/// +/// # Total Ordering +/// +/// Unlike [`RaftLeaderId`], this type implements `Ord` because committed leader IDs +/// have a total ordering as they must be agreed upon by a quorum, and two incomparable +/// [`RaftLeaderId`] can not both be committed by two quorums, because only a **greater** +/// [`RaftLeaderId`] can override an existing value. +/// +/// A [`RaftCommittedLeaderId`] may contain less information than the corresponding +/// [`RaftLeaderId`], because it implies the constraint that **a quorum has granted it**. +/// +/// For a total order [`RaftLeaderId`], the [`RaftCommittedLeaderId`] is the same. +/// +/// For a partial order [`RaftLeaderId`], we know that all the granted leader-id must be a total +/// order set. Therefor once it is granted by a quorum, it only keeps the information that makes +/// leader-ids a correct total order set +/// +/// For example, in standard Raft: +/// - [`RaftLeaderId`] is `(term, voted_for)` - partially ordered +/// - [`RaftCommittedLeaderId`] is just `term` - totally ordered (The `voted_for` field can be +/// dropped since it's no longer needed for ordering) +/// +/// [`RaftLeaderId`]: crate::vote::RaftLeaderId +pub trait RaftCommittedLeaderId +where + C: RaftTypeConfig, + Self: OptionalFeatures + Ord + Clone + Debug + Display + Default + 'static, +{ +} diff --git a/openraft/src/vote/leader_id/raft_leader_id.rs b/openraft/src/vote/leader_id/raft_leader_id.rs new file mode 100644 index 000000000..6bc8c6f93 --- /dev/null +++ b/openraft/src/vote/leader_id/raft_leader_id.rs @@ -0,0 +1,68 @@ +use std::fmt::Debug; +use std::fmt::Display; + +use crate::base::OptionalFeatures; +use crate::vote::leader_id::raft_committed_leader_id::RaftCommittedLeaderId; +use crate::RaftTypeConfig; + +/// A Leader identifier in a OpenRaft cluster. +/// +/// In OpenRaft, a `LeaderId` represents either: +/// - A granted leader that received votes from a quorum (a `Leader` in standard Raft) +/// - A non-granted leader, i.e., candidate (a `Candidate` in standard Raft) +/// +/// The identity of the leader remains the same in both cases. Whether it is granted by a quorum +/// is determined by the `committed` field in [`Vote`]. +/// +/// # Partial Ordering +/// +/// [`RaftLeaderId`] implements `PartialOrd` but not `Ord`. Because to be compatible with standard +/// Raft, in which a `LeaderId` or `CandidateId` is a tuple of `(term, node_id)`: Two such IDs +/// with the same term but different node IDs (e.g., `(1,2)` and `(1,3)`) have no defined ordering - +/// neither can overwrite the other. +/// +/// [`Vote`]: crate::vote::Vote +pub trait RaftLeaderId +where + C: RaftTypeConfig, + Self: OptionalFeatures + PartialOrd + Eq + Clone + Debug + Display + Default + 'static, +{ + /// The committed version of this leader ID. + /// + /// A simple implementation of this trait would return `Self` as the committed version. + type Committed: RaftCommittedLeaderId; + + fn new(term: C::Term, node_id: C::NodeId) -> Self; + + /// Get the term number of this leader + fn term(&self) -> C::Term; + + /// Get the node ID of this leader, if one is set + fn node_id_ref(&self) -> Option<&C::NodeId>; + + /// Convert this leader ID to a committed leader ID. + /// + /// This is used when it has been granted by a quorum. + fn to_committed(&self) -> Self::Committed; +} + +pub trait RaftLeaderIdExt +where + C: RaftTypeConfig, + Self: RaftLeaderId, +{ + fn new_committed(term: C::Term, node_id: C::NodeId) -> Self::Committed { + Self::new(term, node_id).to_committed() + } + + fn node_id(&self) -> Option { + self.node_id_ref().cloned() + } +} + +impl RaftLeaderIdExt for T +where + C: RaftTypeConfig, + T: RaftLeaderId, +{ +} diff --git a/openraft/src/vote/mod.rs b/openraft/src/vote/mod.rs index 93ce1501c..c9210792e 100644 --- a/openraft/src/vote/mod.rs +++ b/openraft/src/vote/mod.rs @@ -1,5 +1,5 @@ pub(crate) mod committed; -mod leader_id; +pub(crate) mod leader_id; pub(crate) mod non_committed; pub(crate) mod ref_vote; #[allow(clippy::module_inception)] @@ -7,6 +7,9 @@ mod vote; pub(crate) mod vote_status; pub(crate) use committed::CommittedVote; +pub use leader_id::raft_committed_leader_id::RaftCommittedLeaderId; +pub use leader_id::raft_leader_id::RaftLeaderId; +pub use leader_id::raft_leader_id::RaftLeaderIdExt; pub use leader_id::CommittedLeaderId; pub use leader_id::LeaderId; pub(crate) use non_committed::NonCommittedVote; diff --git a/openraft/src/vote/ref_vote.rs b/openraft/src/vote/ref_vote.rs index f81743e4a..7d7c2a9af 100644 --- a/openraft/src/vote/ref_vote.rs +++ b/openraft/src/vote/ref_vote.rs @@ -1,24 +1,23 @@ use std::cmp::Ordering; use std::fmt::Formatter; -use crate::LeaderId; use crate::RaftTypeConfig; -/// Same as [`Vote`] but with a reference to the [`LeaderId`]. +/// Same as [`Vote`] but with a reference to the `LeaderId`. /// /// [`Vote`]: crate::vote::Vote #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct RefVote<'a, C> where C: RaftTypeConfig { - pub(crate) leader_id: &'a LeaderId, + pub(crate) leader_id: &'a C::LeaderId, pub(crate) committed: bool, } impl<'a, C> RefVote<'a, C> where C: RaftTypeConfig { - pub(crate) fn new(leader_id: &'a LeaderId, committed: bool) -> Self { + pub(crate) fn new(leader_id: &'a C::LeaderId, committed: bool) -> Self { Self { leader_id, committed } } diff --git a/openraft/src/vote/vote.rs b/openraft/src/vote/vote.rs index 6f975953b..1f5e08e5b 100644 --- a/openraft/src/vote/vote.rs +++ b/openraft/src/vote/vote.rs @@ -1,12 +1,12 @@ use std::cmp::Ordering; use std::fmt::Formatter; +use crate::type_config::alias::CommittedLeaderIdOf; use crate::vote::committed::CommittedVote; -use crate::vote::leader_id::CommittedLeaderId; use crate::vote::ref_vote::RefVote; use crate::vote::vote_status::VoteStatus; use crate::vote::NonCommittedVote; -use crate::LeaderId; +use crate::vote::RaftLeaderId; use crate::RaftTypeConfig; /// `Vote` represent the privilege of a node. @@ -14,7 +14,7 @@ use crate::RaftTypeConfig; #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct Vote { /// The id of the node that tries to become the leader. - pub leader_id: LeaderId, + pub leader_id: C::LeaderId, pub committed: bool, } @@ -22,7 +22,7 @@ pub struct Vote { impl Copy for Vote where C: RaftTypeConfig, - C::NodeId: Copy, + C::LeaderId: Copy, { } @@ -53,14 +53,14 @@ where C: RaftTypeConfig { pub fn new(term: C::Term, node_id: C::NodeId) -> Self { Self { - leader_id: LeaderId::new(term, node_id), + leader_id: C::LeaderId::new(term, node_id), committed: false, } } pub fn new_committed(term: C::Term, node_id: C::NodeId) -> Self { Self { - leader_id: LeaderId::new(term, node_id), + leader_id: C::LeaderId::new(term, node_id), committed: true, } } @@ -95,15 +95,16 @@ where C: RaftTypeConfig self.committed } - /// Return the [`LeaderId`] this vote represents for. + /// Return the `LeaderId` this vote represents for. /// /// The leader may or may not be granted by a quorum. - pub fn leader_id(&self) -> &LeaderId { + pub fn leader_id(&self) -> &C::LeaderId { &self.leader_id } - pub(crate) fn is_same_leader(&self, leader_id: &CommittedLeaderId) -> bool { - self.leader_id().is_same_as_committed(leader_id) + // TODO: remove this method + pub(crate) fn is_same_leader(&self, leader_id: &CommittedLeaderIdOf) -> bool { + self.leader_id().to_committed() == *leader_id } } diff --git a/tests/tests/append_entries/t10_conflict_with_empty_entries.rs b/tests/tests/append_entries/t10_conflict_with_empty_entries.rs index 4fae50ec1..ebe37add3 100644 --- a/tests/tests/append_entries/t10_conflict_with_empty_entries.rs +++ b/tests/tests/append_entries/t10_conflict_with_empty_entries.rs @@ -7,11 +7,10 @@ use openraft::network::RPCOption; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::testing::blank_ent; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; use openraft::Entry; use openraft::EntryPayload; -use openraft::LogId; use openraft::Vote; use openraft_memstore::ClientRequest; @@ -55,9 +54,9 @@ async fn conflict_with_empty_entries() -> Result<()> { let rpc = AppendEntriesRequest:: { vote: Vote::new_committed(1, 1), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 5)), + prev_log_id: Some(log_id(1, 0, 5)), entries: vec![], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 5)), + leader_commit: Some(log_id(1, 0, 5)), }; let option = RPCOption::new(Duration::from_millis(1_000)); @@ -71,14 +70,14 @@ async fn conflict_with_empty_entries() -> Result<()> { vote: Vote::new_committed(1, 1), prev_log_id: None, entries: vec![blank_ent(0, 0, 0), blank_ent(1, 0, 1), Entry { - log_id: LogId::new(CommittedLeaderId::new(1, 0), 2), + log_id: log_id(1, 0, 2), payload: EntryPayload::Normal(ClientRequest { client: "foo".to_string(), serial: 1, status: "bar".to_string(), }), }], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 5)), + leader_commit: Some(log_id(1, 0, 5)), }; let option = RPCOption::new(Duration::from_millis(1_000)); @@ -91,9 +90,9 @@ async fn conflict_with_empty_entries() -> Result<()> { let rpc = AppendEntriesRequest:: { vote: Vote::new_committed(1, 1), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 3)), + prev_log_id: Some(log_id(1, 0, 3)), entries: vec![], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 5)), + leader_commit: Some(log_id(1, 0, 5)), }; let option = RPCOption::new(Duration::from_millis(1_000)); diff --git a/tests/tests/append_entries/t10_see_higher_vote.rs b/tests/tests/append_entries/t10_see_higher_vote.rs index e28670814..ca2235ad1 100644 --- a/tests/tests/append_entries/t10_see_higher_vote.rs +++ b/tests/tests/append_entries/t10_see_higher_vote.rs @@ -7,9 +7,8 @@ use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; use openraft::network::RaftNetworkFactory; use openraft::raft::VoteRequest; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::ServerState; use openraft::Vote; use openraft_memstore::ClientRequest; @@ -50,7 +49,7 @@ async fn append_sees_higher_vote() -> Result<()> { .vote( VoteRequest { vote: Vote::new(10, 1), - last_log_id: Some(LogId::new(CommittedLeaderId::new(10, 1), 5)), + last_log_id: Some(log_id(10, 1, 5)), }, option, ) diff --git a/tests/tests/append_entries/t11_append_conflicts.rs b/tests/tests/append_entries/t11_append_conflicts.rs index 5436f11a1..a03b49c4c 100644 --- a/tests/tests/append_entries/t11_append_conflicts.rs +++ b/tests/tests/append_entries/t11_append_conflicts.rs @@ -6,10 +6,9 @@ use maplit::btreeset; use openraft::raft::AppendEntriesRequest; use openraft::storage::RaftLogStorage; use openraft::testing::blank_ent; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; use openraft::Entry; -use openraft::LogId; use openraft::RaftLogReader; use openraft::RaftTypeConfig; use openraft::ServerState; @@ -49,7 +48,7 @@ async fn append_conflicts() -> Result<()> { vote: Vote::new_committed(1, 2), prev_log_id: None, entries: vec![], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req).await?; @@ -63,7 +62,7 @@ async fn append_conflicts() -> Result<()> { vote: Vote::new_committed(1, 2), prev_log_id: None, entries: vec![blank_ent(0, 0, 0)], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req).await?; @@ -74,9 +73,9 @@ async fn append_conflicts() -> Result<()> { let req = AppendEntriesRequest { vote: Vote::new_committed(1, 2), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(0, 0), 0)), + prev_log_id: Some(log_id(0, 0, 0)), entries: vec![], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req).await?; @@ -89,7 +88,7 @@ async fn append_conflicts() -> Result<()> { let req = || AppendEntriesRequest { vote: Vote::new_committed(1, 2), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(0, 0), 0)), + prev_log_id: Some(log_id(0, 0, 0)), entries: vec![ blank_ent(1, 0, 1), blank_ent(1, 0, 2), @@ -97,7 +96,7 @@ async fn append_conflicts() -> Result<()> { blank_ent(1, 0, 4), ], // this set the last_applied to 2 - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req()).await?; @@ -119,9 +118,9 @@ async fn append_conflicts() -> Result<()> { let req = AppendEntriesRequest { vote: Vote::new_committed(1, 2), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 1)), + prev_log_id: Some(log_id(1, 0, 1)), entries: vec![blank_ent(1, 0, 2)], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req).await?; @@ -134,10 +133,10 @@ async fn append_conflicts() -> Result<()> { let req = AppendEntriesRequest { vote: Vote::new_committed(1, 2), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + prev_log_id: Some(log_id(1, 0, 2)), entries: vec![blank_ent(2, 0, 3)], // this set the last_applied to 2 - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req).await?; @@ -149,9 +148,9 @@ async fn append_conflicts() -> Result<()> { // check last_log_id is updated: let req = AppendEntriesRequest { vote: Vote::new_committed(1, 2), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 2000)), + prev_log_id: Some(log_id(1, 0, 2000)), entries: vec![], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req).await?; @@ -164,9 +163,9 @@ async fn append_conflicts() -> Result<()> { let req = AppendEntriesRequest { vote: Vote::new_committed(1, 2), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(3, 0), 3)), + prev_log_id: Some(log_id(3, 0, 3)), entries: vec![], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req).await?; @@ -179,9 +178,9 @@ async fn append_conflicts() -> Result<()> { // refill logs let req = AppendEntriesRequest { vote: Vote::new_committed(1, 2), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + prev_log_id: Some(log_id(1, 0, 2)), entries: vec![blank_ent(2, 0, 3), blank_ent(2, 0, 4), blank_ent(2, 0, 5)], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req).await?; @@ -194,9 +193,9 @@ async fn append_conflicts() -> Result<()> { // prev_log_id matches let req = AppendEntriesRequest { vote: Vote::new_committed(1, 2), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(2, 0), 3)), + prev_log_id: Some(log_id(2, 0, 3)), entries: vec![blank_ent(3, 0, 4)], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req).await?; @@ -210,9 +209,9 @@ async fn append_conflicts() -> Result<()> { // refill logs let req = AppendEntriesRequest { vote: Vote::new_committed(1, 2), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 200)), + prev_log_id: Some(log_id(1, 0, 200)), entries: vec![], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let resp = r0.append_entries(req).await?; diff --git a/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs b/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs index a7e8fbb56..92fc421b7 100644 --- a/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs +++ b/tests/tests/append_entries/t11_append_entries_with_bigger_term.rs @@ -8,9 +8,7 @@ use openraft::network::RPCOption; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::testing::log_id; -use openraft::CommittedLeaderId; use openraft::Config; -use openraft::LogId; use openraft::Vote; use crate::fixtures::ut_harness; @@ -37,15 +35,7 @@ async fn append_entries_with_bigger_term() -> Result<()> { let log_index = router.new_cluster(btreeset! {0}, btreeset! {1}).await?; // before append entries, check hard state in term 1 and vote for node 0 - router - .assert_storage_state( - 1, - log_index, - Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), - None, - ) - .await?; + router.assert_storage_state(1, log_index, Some(0), log_id(1, 0, log_index), None).await?; // append entries with term 2 and leader_id, this MUST cause hard state changed in node 0 let req = AppendEntriesRequest:: { @@ -71,7 +61,7 @@ async fn append_entries_with_bigger_term() -> Result<()> { 2, log_index, Some(1), - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), &None, ) .await?; diff --git a/tests/tests/append_entries/t11_append_updates_membership.rs b/tests/tests/append_entries/t11_append_updates_membership.rs index a265f96b6..36178be31 100644 --- a/tests/tests/append_entries/t11_append_updates_membership.rs +++ b/tests/tests/append_entries/t11_append_updates_membership.rs @@ -5,11 +5,10 @@ use anyhow::Result; use maplit::btreeset; use openraft::raft::AppendEntriesRequest; use openraft::testing::blank_ent; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; use openraft::Entry; use openraft::EntryPayload; -use openraft::LogId; use openraft::Membership; use openraft::ServerState; use openraft::Vote; @@ -51,17 +50,17 @@ async fn append_updates_membership() -> Result<()> { blank_ent(0, 0, 0), blank_ent(1, 0, 1), Entry { - log_id: LogId::new(CommittedLeaderId::new(1, 0), 2), + log_id: log_id(1, 0, 2), payload: EntryPayload::Membership(Membership::new_with_defaults(vec![btreeset! {1,2}], [])), }, blank_ent(1, 0, 3), Entry { - log_id: LogId::new(CommittedLeaderId::new(1, 0), 4), + log_id: log_id(1, 0, 4), payload: EntryPayload::Membership(Membership::new_with_defaults(vec![btreeset! {1,2,3,4}], [])), }, blank_ent(1, 0, 5), ], - leader_commit: Some(LogId::new(CommittedLeaderId::new(0, 0), 0)), + leader_commit: Some(log_id(0, 0, 0)), }; let resp = r0.append_entries(req).await?; @@ -75,9 +74,9 @@ async fn append_updates_membership() -> Result<()> { { let req = AppendEntriesRequest { vote: Vote::new_committed(2, 2), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + prev_log_id: Some(log_id(1, 0, 2)), entries: vec![blank_ent(2, 0, 3)], - leader_commit: Some(LogId::new(CommittedLeaderId::new(0, 0), 0)), + leader_commit: Some(log_id(0, 0, 0)), }; let resp = r0.append_entries(req).await?; diff --git a/tests/tests/client_api/t10_client_writes.rs b/tests/tests/client_api/t10_client_writes.rs index d363fcc9b..ea1026365 100644 --- a/tests/tests/client_api/t10_client_writes.rs +++ b/tests/tests/client_api/t10_client_writes.rs @@ -4,9 +4,8 @@ use anyhow::Result; use futures::prelude::*; use maplit::btreeset; use openraft::raft::ClientWriteResponse; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::SnapshotPolicy; use openraft_memstore::ClientRequest; use openraft_memstore::IntoMemClientRequest; @@ -57,7 +56,7 @@ async fn client_writes() -> Result<()> { 1, log_index, Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), Some(((499..600).into(), 1)), ) .await?; diff --git a/tests/tests/client_api/t13_trigger_snapshot.rs b/tests/tests/client_api/t13_trigger_snapshot.rs index 6a1f2d003..dd7f1427b 100644 --- a/tests/tests/client_api/t13_trigger_snapshot.rs +++ b/tests/tests/client_api/t13_trigger_snapshot.rs @@ -2,9 +2,8 @@ use std::sync::Arc; use std::time::Duration; use maplit::btreeset; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use crate::fixtures::ut_harness; use crate::fixtures::RaftRouter; @@ -31,10 +30,7 @@ async fn trigger_snapshot() -> anyhow::Result<()> { let n1 = router.get_raft_handle(&1)?; n1.trigger().snapshot().await?; - router - .wait(&1, timeout()) - .snapshot(LogId::new(CommittedLeaderId::new(1, 0), log_index), "node-1 snapshot") - .await?; + router.wait(&1, timeout()).snapshot(log_id(1, 0, log_index), "node-1 snapshot").await?; } tracing::info!(log_index, "--- send some logs"); @@ -51,10 +47,7 @@ async fn trigger_snapshot() -> anyhow::Result<()> { let n0 = router.get_raft_handle(&0)?; n0.trigger().snapshot().await?; - router - .wait(&0, timeout()) - .snapshot(LogId::new(CommittedLeaderId::new(1, 0), log_index), "node-0 snapshot") - .await?; + router.wait(&0, timeout()).snapshot(log_id(1, 0, log_index), "node-0 snapshot").await?; } Ok(()) diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index f7610b89b..a823206d2 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -186,6 +186,8 @@ impl fmt::Display for Direction { } use openraft::network::v2::RaftNetworkV2; +use openraft::vote::RaftLeaderId; +use openraft::vote::RaftLeaderIdExt; use Direction::NetRecv; use Direction::NetSend; @@ -880,7 +882,7 @@ impl TypedRaftRouter { let vote = storage.read_vote().await?.unwrap_or_else(|| panic!("no hard state found for node {}", id)); assert_eq!( - vote.leader_id().get_term(), + vote.leader_id().term(), expect_term, "expected node {} to have term {}, got {:?}", id, @@ -890,8 +892,8 @@ impl TypedRaftRouter { if let Some(voted_for) = &expect_voted_for { assert_eq!( - vote.leader_id().voted_for(), - Some(*voted_for), + vote.leader_id().node_id_ref(), + Some(voted_for), "expected node {} to have voted for {}, got {:?}", id, voted_for, @@ -1020,7 +1022,7 @@ impl RaftNetworkV2 for RaftRouterNetwork { mut rpc: AppendEntriesRequest, _option: RPCOption, ) -> Result, RPCError> { - let from_id = rpc.vote.leader_id().voted_for().unwrap(); + let from_id = rpc.vote.leader_id().node_id_ref().cloned().unwrap(); tracing::debug!("append_entries to id={} {}", self.target, rpc); self.owner.count_rpc(RPCTypes::AppendEntries); @@ -1090,7 +1092,7 @@ impl RaftNetworkV2 for RaftRouterNetwork { _cancel: impl Future + OptionalSend + 'static, _option: RPCOption, ) -> Result, StreamingError> { - let from_id = vote.leader_id().voted_for().unwrap(); + let from_id = vote.leader_id().node_id().unwrap(); self.owner.count_rpc(RPCTypes::InstallSnapshot); self.owner.call_rpc_pre_hook(snapshot.clone(), from_id, self.target)?; @@ -1116,7 +1118,7 @@ impl RaftNetworkV2 for RaftRouterNetwork { rpc: VoteRequest, _option: RPCOption, ) -> Result, RPCError> { - let from_id = rpc.vote.leader_id().voted_for().unwrap(); + let from_id = rpc.vote.leader_id().node_id().unwrap(); self.owner.count_rpc(RPCTypes::Vote); self.owner.call_rpc_pre_hook(rpc.clone(), from_id, self.target)?; @@ -1141,7 +1143,7 @@ impl RaftNetworkV2 for RaftRouterNetwork { rpc: TransferLeaderRequest, _option: RPCOption, ) -> Result<(), RPCError> { - let from_id = rpc.from_leader().leader_id().voted_for().unwrap(); + let from_id = rpc.from_leader().leader_id().node_id().unwrap(); self.owner.count_rpc(RPCTypes::TransferLeader); self.owner.call_rpc_pre_hook(rpc.clone(), from_id, self.target)?; diff --git a/tests/tests/life_cycle/t10_initialization.rs b/tests/tests/life_cycle/t10_initialization.rs index 8733dc63d..8edbb636f 100644 --- a/tests/tests/life_cycle/t10_initialization.rs +++ b/tests/tests/life_cycle/t10_initialization.rs @@ -8,11 +8,10 @@ use openraft::error::InitializeError; use openraft::error::NotAllowed; use openraft::error::NotInMembers; use openraft::storage::RaftStateMachine; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; use openraft::EffectiveMembership; use openraft::EntryPayload; -use openraft::LogId; use openraft::Membership; use openraft::RaftLogReader; use openraft::ServerState; @@ -104,7 +103,7 @@ async fn initialization() -> anyhow::Result<()> { for node_id in [0, 1, 2] { router.external_request(node_id, move |s| { let want = EffectiveMembership::new( - Some(LogId::new(CommittedLeaderId::new(0, 0), 0)), + Some(log_id(0, 0, 0)), Membership::new_with_defaults(vec![btreeset! {0,1,2}], []), ); let want = Arc::new(want); @@ -144,7 +143,7 @@ async fn initialization() -> anyhow::Result<()> { let sm_mem = sm.applied_state().await?.1; assert_eq!( StoredMembership::new( - Some(LogId::new(CommittedLeaderId::new(0, 0), 0)), + Some(log_id(0, 0, 0)), Membership::new_with_defaults(vec![btreeset! {0,1,2}], []) ), sm_mem @@ -247,10 +246,7 @@ async fn initialize_err_not_allowed() -> anyhow::Result<()> { assert_eq!( InitializeError::NotAllowed(NotAllowed { - last_log_id: Some(LogId { - leader_id: CommittedLeaderId::new(1, 0), - index: 1 - }), + last_log_id: Some(log_id(1, 0, 1)), vote: Vote::new_committed(1, 0) }), err.into_api_error().unwrap() diff --git a/tests/tests/life_cycle/t50_single_follower_restart.rs b/tests/tests/life_cycle/t50_single_follower_restart.rs index 80c221245..2e5d40da5 100644 --- a/tests/tests/life_cycle/t50_single_follower_restart.rs +++ b/tests/tests/life_cycle/t50_single_follower_restart.rs @@ -3,6 +3,8 @@ use std::time::Duration; use maplit::btreeset; use openraft::storage::RaftLogStorage; +use openraft::vote::RaftLeaderId; +use openraft::vote::RaftLeaderIdExt; use openraft::Config; use openraft::RaftLogReader; use openraft::ServerState; @@ -46,7 +48,7 @@ async fn single_follower_restart() -> anyhow::Result<()> { let v = sto.read_vote().await?.unwrap_or_default(); // Set a non-committed vote so that the node restarts as a follower. - sto.save_vote(&Vote::new(v.leader_id.get_term() + 1, v.leader_id.voted_for().unwrap())).await?; + sto.save_vote(&Vote::new(v.leader_id.term() + 1, v.leader_id.node_id().unwrap())).await?; tracing::info!(log_index, "--- restart node-0"); diff --git a/tests/tests/membership/t10_single_node.rs b/tests/tests/membership/t10_single_node.rs index 1ff30c317..f9d923c50 100644 --- a/tests/tests/membership/t10_single_node.rs +++ b/tests/tests/membership/t10_single_node.rs @@ -3,9 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use crate::fixtures::ut_harness; use crate::fixtures::RaftRouter; @@ -38,15 +37,7 @@ async fn single_node() -> Result<()> { // Write some data to the single node cluster. log_index += router.client_request_many(0, "0", 1000).await?; router.wait_for_log(&btreeset![0], Some(log_index), timeout(), "client_request_many").await?; - router - .assert_storage_state( - 1, - log_index, - Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), - None, - ) - .await?; + router.assert_storage_state(1, log_index, Some(0), log_id(1, 0, log_index), None).await?; // Read some data from the single node cluster. router.ensure_linearizable(0).await?; diff --git a/tests/tests/membership/t11_add_learner.rs b/tests/tests/membership/t11_add_learner.rs index 2db937379..b7b12710b 100644 --- a/tests/tests/membership/t11_add_learner.rs +++ b/tests/tests/membership/t11_add_learner.rs @@ -7,14 +7,12 @@ use maplit::btreeset; use openraft::error::ChangeMembershipError; use openraft::error::ClientWriteError; use openraft::error::InProgress; +use openraft::testing::log_id; use openraft::ChangeMembers; -use openraft::CommittedLeaderId; use openraft::Config; -use openraft::LogId; use openraft::Membership; use openraft::RaftLogReader; use openraft::StorageHelper; -use openraft_memstore::TypeConfig; use tokio::time::sleep; use crate::fixtures::ut_harness; @@ -345,10 +343,3 @@ async fn check_learner_after_leader_transferred() -> Result<()> { fn timeout() -> Option { Some(Duration::from_millis(3_000)) } - -pub fn log_id(term: u64, node_id: u64, index: u64) -> LogId { - LogId { - leader_id: CommittedLeaderId::new(term, node_id), - index, - } -} diff --git a/tests/tests/membership/t31_remove_leader.rs b/tests/tests/membership/t31_remove_leader.rs index f14a53715..1982601c1 100644 --- a/tests/tests/membership/t31_remove_leader.rs +++ b/tests/tests/membership/t31_remove_leader.rs @@ -4,9 +4,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; use openraft::error::ClientWriteError; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::ServerState; use openraft_memstore::ClientRequest; use openraft_memstore::IntoMemClientRequest; @@ -100,7 +99,7 @@ async fn remove_leader() -> Result<()> { assert_eq!(metrics.current_term, 1); assert_eq!(metrics.last_log_index, Some(8)); - assert_eq!(metrics.last_applied, Some(LogId::new(CommittedLeaderId::new(1, 0), 8))); + assert_eq!(metrics.last_applied, Some(log_id(1, 0, 8))); assert_eq!(metrics.membership_config.membership().get_joint_config().clone(), vec![ btreeset![1, 2, 3] ]); diff --git a/tests/tests/metrics/t30_leader_metrics.rs b/tests/tests/metrics/t30_leader_metrics.rs index 151499298..8567d5b40 100644 --- a/tests/tests/metrics/t30_leader_metrics.rs +++ b/tests/tests/metrics/t30_leader_metrics.rs @@ -5,9 +5,7 @@ use anyhow::Result; use maplit::btreemap; use maplit::btreeset; use openraft::testing::log_id; -use openraft::CommittedLeaderId; use openraft::Config; -use openraft::LogId; use openraft::ServerState; #[allow(unused_imports)] use pretty_assertions::assert_eq; @@ -87,7 +85,7 @@ async fn leader_metrics() -> Result<()> { router.wait_for_log(&c01234, Some(log_index), timeout(), "change members to 0,1,2,3,4").await?; - let ww = Some(LogId::new(CommittedLeaderId::new(1, 0), log_index)); + let ww = Some(log_id(1, 0, log_index)); let want_repl = btreemap! { 0u64=>ww, 1u64=>ww, 2=>ww, 3=>ww, 4=>ww, }; router .wait_for_metrics( @@ -129,7 +127,7 @@ async fn leader_metrics() -> Result<()> { "--- replication metrics should reflect the replication state" ); { - let ww = Some(LogId::new(CommittedLeaderId::new(1, 0), log_index)); + let ww = Some(log_id(1, 0, log_index)); let want_repl = btreemap! { 0=>ww, 1=>ww, 2=>ww, 3=>ww}; router .wait_for_metrics( diff --git a/tests/tests/snapshot_building/t10_build_snapshot.rs b/tests/tests/snapshot_building/t10_build_snapshot.rs index 12cce8994..937c630d5 100644 --- a/tests/tests/snapshot_building/t10_build_snapshot.rs +++ b/tests/tests/snapshot_building/t10_build_snapshot.rs @@ -9,11 +9,10 @@ use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::storage::RaftLogStorageExt; use openraft::testing::blank_ent; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; use openraft::Entry; use openraft::EntryPayload; -use openraft::LogId; use openraft::Membership; use openraft::RaftLogReader; use openraft::SnapshotPolicy; @@ -60,21 +59,14 @@ async fn build_snapshot() -> Result<()> { tracing::info!(log_index, "--- log_index: {}", log_index); router.wait_for_log(&btreeset![0], Some(log_index), timeout(), "write").await?; - router - .wait_for_snapshot( - &btreeset![0], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - None, - "snapshot", - ) - .await?; + router.wait_for_snapshot(&btreeset![0], log_id(1, 0, log_index), None, "snapshot").await?; router .assert_storage_state( 1, log_index, Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), Some((log_index.into(), 1)), ) .await?; @@ -82,7 +74,7 @@ async fn build_snapshot() -> Result<()> { // Add a new node and assert that it received the same snapshot. let (mut sto1, sm1) = router.new_store(); sto1.blocking_append([blank_ent(0, 0, 0), Entry { - log_id: LogId::new(CommittedLeaderId::new(1, 0), 1), + log_id: log_id(1, 0, 1), payload: EntryPayload::Membership(Membership::new_with_defaults(vec![btreeset! {0}], [])), }]) .await?; @@ -109,7 +101,7 @@ async fn build_snapshot() -> Result<()> { let (mut sto, _sm) = router.get_storage_handle(&1)?; let logs = sto.try_get_log_entries(..).await?; assert_eq!(2, logs.len()); - assert_eq!(LogId::new(CommittedLeaderId::new(1, 0), log_index - 1), logs[0].log_id) + assert_eq!(log_id(1, 0, log_index - 1), logs[0].log_id) } // log 0 counts @@ -119,7 +111,7 @@ async fn build_snapshot() -> Result<()> { 1, log_index, None, /* learner does not vote */ - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), expected_snap, ) .await?; @@ -136,9 +128,9 @@ async fn build_snapshot() -> Result<()> { .append_entries( AppendEntriesRequest { vote: Vote::new_committed(1, 0), - prev_log_id: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + prev_log_id: Some(log_id(1, 0, 2)), entries: vec![], - leader_commit: Some(LogId::new(CommittedLeaderId::new(0, 0), 0)), + leader_commit: Some(log_id(0, 0, 0)), }, option, ) diff --git a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs index 104161b03..5ba05b54e 100644 --- a/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs +++ b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs @@ -3,9 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::RaftLogReader; use tokio::time::sleep; @@ -42,13 +41,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { { log_index += router.client_request_many(0, "0", 10).await?; leader.trigger().snapshot().await?; - leader - .wait(timeout()) - .snapshot( - LogId::new(CommittedLeaderId::new(1, 0), log_index), - "building 1st snapshot", - ) - .await?; + leader.wait(timeout()).snapshot(log_id(1, 0, log_index), "building 1st snapshot").await?; let (mut sto0, mut _sm0) = router.get_storage_handle(&0)?; // Wait for purge to complete. @@ -68,13 +61,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { router.wait(&0, timeout()).applied_index(Some(log_index), "write another 5 logs").await?; leader.trigger().snapshot().await?; - leader - .wait(timeout()) - .snapshot( - LogId::new(CommittedLeaderId::new(1, 0), log_index), - "building 2nd snapshot", - ) - .await?; + leader.wait(timeout()).snapshot(log_id(1, 0, log_index), "building 2nd snapshot").await?; } // There may be a cached append-entries request that already loads log 10..15 from the store, @@ -88,13 +75,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { { router.set_network_error(1, false); - learner - .wait(timeout()) - .snapshot( - LogId::new(CommittedLeaderId::new(1, 0), log_index), - "learner install snapshot", - ) - .await?; + learner.wait(timeout()).snapshot(log_id(1, 0, log_index), "learner install snapshot").await?; let (mut sto1, mut _sm) = router.get_storage_handle(&1)?; let logs = sto1.try_get_log_entries(..).await?; diff --git a/tests/tests/snapshot_streaming/t31_snapshot_overrides_membership.rs b/tests/tests/snapshot_streaming/t31_snapshot_overrides_membership.rs index 271cae3d6..e3ed2a5b3 100644 --- a/tests/tests/snapshot_streaming/t31_snapshot_overrides_membership.rs +++ b/tests/tests/snapshot_streaming/t31_snapshot_overrides_membership.rs @@ -9,12 +9,11 @@ use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; use openraft::storage::StorageHelper; use openraft::testing::blank_ent; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; use openraft::EffectiveMembership; use openraft::Entry; use openraft::EntryPayload; -use openraft::LogId; use openraft::Membership; use openraft::SnapshotPolicy; use openraft::Vote; @@ -62,20 +61,13 @@ async fn snapshot_overrides_membership() -> Result<()> { ) .await?; - router - .wait_for_snapshot( - &btreeset![0], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - timeout(), - "snapshot", - ) - .await?; + router.wait_for_snapshot(&btreeset![0], log_id(1, 0, log_index), timeout(), "snapshot").await?; router .assert_storage_state( 1, log_index, Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), Some((log_index.into(), 1)), ) .await?; @@ -93,10 +85,10 @@ async fn snapshot_overrides_membership() -> Result<()> { vote: Vote::new_committed(1, 0), prev_log_id: None, entries: vec![blank_ent(0, 0, 0), Entry { - log_id: LogId::new(CommittedLeaderId::new(1, 0), 1), + log_id: log_id(1, 0, 1), payload: EntryPayload::Membership(Membership::new_with_defaults(vec![btreeset! {2,3}], [])), }], - leader_commit: Some(LogId::new(CommittedLeaderId::new(0, 0), 0)), + leader_commit: Some(log_id(0, 0, 0)), }; let option = RPCOption::new(Duration::from_millis(1_000)); @@ -127,14 +119,7 @@ async fn snapshot_overrides_membership() -> Result<()> { tracing::info!(log_index, "--- DONE add learner"); router.wait_for_log(&btreeset![0, 1], Some(log_index), timeout(), "add learner").await?; - router - .wait_for_snapshot( - &btreeset![1], - LogId::new(CommittedLeaderId::new(1, 0), snapshot_index), - timeout(), - "", - ) - .await?; + router.wait_for_snapshot(&btreeset![1], log_id(1, 0, snapshot_index), timeout(), "").await?; let expected_snap = Some((snapshot_index.into(), 1)); @@ -143,7 +128,7 @@ async fn snapshot_overrides_membership() -> Result<()> { 1, log_index, None, /* learner does not vote */ - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), expected_snap, ) .await?; diff --git a/tests/tests/snapshot_streaming/t32_snapshot_uses_prev_snap_membership.rs b/tests/tests/snapshot_streaming/t32_snapshot_uses_prev_snap_membership.rs index 9031973ad..f1d17e156 100644 --- a/tests/tests/snapshot_streaming/t32_snapshot_uses_prev_snap_membership.rs +++ b/tests/tests/snapshot_streaming/t32_snapshot_uses_prev_snap_membership.rs @@ -3,9 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::Membership; use openraft::RaftLogReader; use openraft::SnapshotPolicy; @@ -63,14 +62,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { "send log to trigger snapshot", ) .await?; - router - .wait_for_snapshot( - &btreeset![0], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - timeout(), - "1st snapshot", - ) - .await?; + router.wait_for_snapshot(&btreeset![0], log_id(1, 0, log_index), timeout(), "1st snapshot").await?; { let logs = sto0.try_get_log_entries(..).await?; @@ -96,14 +88,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { log_index = snapshot_threshold * 2 - 1; router.wait_for_log(&btreeset![0, 1], Some(log_index), None, "send log to trigger snapshot").await?; - router - .wait_for_snapshot( - &btreeset![0], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - None, - "2nd snapshot", - ) - .await?; + router.wait_for_snapshot(&btreeset![0], log_id(1, 0, log_index), None, "2nd snapshot").await?; } tracing::info!(log_index, "--- check membership"); diff --git a/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs index 4b5b2ab40..650bc5ef1 100644 --- a/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs +++ b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs @@ -13,11 +13,9 @@ use openraft::storage::RaftStateMachine; use openraft::testing::blank_ent; use openraft::testing::log_id; use openraft::testing::membership_ent; -use openraft::CommittedLeaderId; use openraft::Config; use openraft::Entry; use openraft::EntryPayload; -use openraft::LogId; use openraft::Membership; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; @@ -80,10 +78,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { log_index = snapshot_threshold - 1; router.wait(&0, timeout()).applied_index(Some(log_index), "trigger snapshot").await?; - router - .wait(&0, timeout()) - .snapshot(LogId::new(CommittedLeaderId::new(5, 0), log_index), "build snapshot") - .await?; + router.wait(&0, timeout()).snapshot(log_id(5, 0, log_index), "build snapshot").await?; } tracing::info!(log_index, "--- create node-1 and add conflicting logs"); @@ -98,7 +93,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { blank_ent(1, 0, 1), // conflict membership will be replaced with membership in snapshot Entry { - log_id: LogId::new(CommittedLeaderId::new(1, 0), 2), + log_id: log_id(1, 0, 2), payload: EntryPayload::Membership(Membership::new_with_defaults(vec![btreeset! {2,3}], [])), }, blank_ent(1, 0, 3), @@ -111,11 +106,11 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { blank_ent(1, 0, 10), // another conflict membership, will be removed Entry { - log_id: LogId::new(CommittedLeaderId::new(1, 0), 11), + log_id: log_id(1, 0, 11), payload: EntryPayload::Membership(Membership::new_with_defaults(vec![btreeset! {4,5}], [])), }, ], - leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 2)), + leader_commit: Some(log_id(1, 0, 2)), }; let option = RPCOption::new(Duration::from_millis(1_000)); @@ -185,12 +180,12 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { let log_st = sto1.get_log_state().await?; assert_eq!( - Some(LogId::new(CommittedLeaderId::new(5, 0), snapshot_threshold - 1)), + Some(log_id(5, 0, snapshot_threshold - 1)), log_st.last_purged_log_id, "purge up to last log id in snapshot" ); assert_eq!( - Some(LogId::new(CommittedLeaderId::new(5, 0), snapshot_threshold - 1)), + Some(log_id(5, 0, snapshot_threshold - 1)), log_st.last_log_id, "reverted to last log id in snapshot" ); diff --git a/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs index df2b1b157..091ccf170 100644 --- a/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs +++ b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs @@ -3,9 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::RaftLogReader; use tokio::time::sleep; @@ -48,10 +47,7 @@ async fn replication_does_not_block_purge() -> Result<()> { log_index += router.client_request_many(0, "0", 10).await?; leader.trigger().snapshot().await?; - leader - .wait(timeout()) - .snapshot(LogId::new(CommittedLeaderId::new(1, 0), log_index), "built snapshot") - .await?; + leader.wait(timeout()).snapshot(log_id(1, 0, log_index), "built snapshot").await?; sleep(Duration::from_millis(500)).await; diff --git a/tests/tests/snapshot_streaming/t50_snapshot_line_rate_to_snapshot.rs b/tests/tests/snapshot_streaming/t50_snapshot_line_rate_to_snapshot.rs index 8c5fd351b..a59f174ce 100644 --- a/tests/tests/snapshot_streaming/t50_snapshot_line_rate_to_snapshot.rs +++ b/tests/tests/snapshot_streaming/t50_snapshot_line_rate_to_snapshot.rs @@ -3,9 +3,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::SnapshotPolicy; use crate::fixtures::ut_harness; @@ -71,12 +70,7 @@ async fn snapshot_line_rate_to_snapshot() -> Result<()> { ) .await?; router - .wait_for_snapshot( - &btreeset![0], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - timeout(), - "snapshot on node 0", - ) + .wait_for_snapshot(&btreeset![0], log_id(1, 0, log_index), timeout(), "snapshot on node 0") .await?; } @@ -86,12 +80,7 @@ async fn snapshot_line_rate_to_snapshot() -> Result<()> { router.wait_for_log(&btreeset![1], Some(log_index), timeout(), "replicate by snapshot").await?; router - .wait_for_snapshot( - &btreeset![1], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - timeout(), - "snapshot on node 1", - ) + .wait_for_snapshot(&btreeset![1], log_id(1, 0, log_index), timeout(), "snapshot on node 1") .await?; } diff --git a/tests/tests/snapshot_streaming/t50_snapshot_when_lacking_log.rs b/tests/tests/snapshot_streaming/t50_snapshot_when_lacking_log.rs index d0bd293c2..2052b0deb 100644 --- a/tests/tests/snapshot_streaming/t50_snapshot_when_lacking_log.rs +++ b/tests/tests/snapshot_streaming/t50_snapshot_when_lacking_log.rs @@ -2,9 +2,8 @@ use std::sync::Arc; use anyhow::Result; use maplit::btreeset; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::SnapshotPolicy; use crate::fixtures::ut_harness; @@ -42,20 +41,13 @@ async fn switch_to_snapshot_replication_when_lacking_log() -> Result<()> { router.wait_for_log(&btreeset![0], Some(log_index), None, "send log to trigger snapshot").await?; - router - .wait_for_snapshot( - &btreeset![0], - LogId::new(CommittedLeaderId::new(1, 0), log_index), - None, - "snapshot", - ) - .await?; + router.wait_for_snapshot(&btreeset![0], log_id(1, 0, log_index), None, "snapshot").await?; router .assert_storage_state( 1, log_index, Some(0), - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), Some((log_index.into(), 1)), ) .await?; @@ -77,21 +69,14 @@ async fn switch_to_snapshot_replication_when_lacking_log() -> Result<()> { log_index += 1; router.wait_for_log(&btreeset![0, 1], Some(log_index), None, "add learner").await?; - router - .wait_for_snapshot( - &btreeset![1], - LogId::new(CommittedLeaderId::new(1, 0), snapshot_threshold - 1), - None, - "", - ) - .await?; + router.wait_for_snapshot(&btreeset![1], log_id(1, 0, snapshot_threshold - 1), None, "").await?; let expected_snap = Some(((snapshot_threshold - 1).into(), 1)); router .assert_storage_state( 1, log_index, None, /* learner does not vote */ - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), expected_snap, ) .await?; diff --git a/tests/tests/snapshot_streaming/t51_after_snapshot_add_learner_and_request_a_log.rs b/tests/tests/snapshot_streaming/t51_after_snapshot_add_learner_and_request_a_log.rs index 9a7c02164..234743c50 100644 --- a/tests/tests/snapshot_streaming/t51_after_snapshot_add_learner_and_request_a_log.rs +++ b/tests/tests/snapshot_streaming/t51_after_snapshot_add_learner_and_request_a_log.rs @@ -4,9 +4,7 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; use openraft::testing::log_id; -use openraft::CommittedLeaderId; use openraft::Config; -use openraft::LogId; use openraft::SnapshotPolicy; use crate::fixtures::ut_harness; @@ -78,10 +76,7 @@ async fn after_snapshot_add_learner_and_request_a_log() -> Result<()> { router .wait(&1, timeout()) - .snapshot( - LogId::new(CommittedLeaderId::new(1, 0), snapshot_index), - "learner-1 receives snapshot", - ) + .snapshot(log_id(1, 0, snapshot_index), "learner-1 receives snapshot") .await?; log_index += router.client_request_many(0, "0", 1).await?; @@ -103,7 +98,7 @@ async fn after_snapshot_add_learner_and_request_a_log() -> Result<()> { 1, log_index, None, /* learner does not vote */ - LogId::new(CommittedLeaderId::new(1, 0), log_index), + log_id(1, 0, log_index), expected_snap, ) .await?; diff --git a/tests/tests/state_machine/t20_state_machine_apply_membership.rs b/tests/tests/state_machine/t20_state_machine_apply_membership.rs index 2b6ddec4b..dba3e3707 100644 --- a/tests/tests/state_machine/t20_state_machine_apply_membership.rs +++ b/tests/tests/state_machine/t20_state_machine_apply_membership.rs @@ -3,9 +3,8 @@ use std::sync::Arc; use anyhow::Result; use maplit::btreeset; use openraft::storage::RaftStateMachine; -use openraft::CommittedLeaderId; +use openraft::testing::log_id; use openraft::Config; -use openraft::LogId; use openraft::LogIdOptionExt; use openraft::Membership; use openraft::StoredMembership; @@ -40,7 +39,7 @@ async fn state_machine_apply_membership() -> Result<()> { let (_sto, mut sm) = router.get_storage_handle(&i)?; assert_eq!( StoredMembership::new( - Some(LogId::new(CommittedLeaderId::new(0, 0), 0)), + Some(log_id(0, 0, 0)), Membership::new_with_defaults(vec![btreeset! {0}], []) ), sm.applied_state().await?.1 @@ -88,7 +87,7 @@ async fn state_machine_apply_membership() -> Result<()> { let (_, last_membership) = sm.applied_state().await?; assert_eq!( StoredMembership::new( - Some(LogId::new(CommittedLeaderId::new(1, 0), log_index)), + Some(log_id(1, 0, log_index)), Membership::new_with_defaults(vec![btreeset! {0, 1, 2}], btreeset! {3,4}) ), last_membership