Skip to content

Commit

Permalink
Feature: Abstract LeaderId and CommittedLeaderId
Browse files Browse the repository at this point in the history
Add `LeaderId: RaftLeaderId` to `RaftTypeConfig` to allow customizing leader ID
implementations.

- Part of #1278
  • Loading branch information
drmingdrmer committed Dec 30, 2024
1 parent 9f2f4d3 commit b230bc2
Show file tree
Hide file tree
Showing 65 changed files with 392 additions and 443 deletions.
5 changes: 3 additions & 2 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ use crate::type_config::TypeConfigExt;
use crate::vote::vote_status::VoteStatus;
use crate::vote::CommittedVote;
use crate::vote::NonCommittedVote;
use crate::vote::RaftLeaderId;
use crate::ChangeMembers;
use crate::Instant;
use crate::LogId;
Expand Down Expand Up @@ -583,7 +584,7 @@ where
id: self.id.clone(),

// --- data ---
current_term: st.vote_ref().leader_id().get_term(),
current_term: st.vote_ref().leader_id().term(),
vote: st.io_state().io_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(),
last_log_index: st.last_log_id().index(),
last_applied: st.io_applied().cloned(),
Expand Down Expand Up @@ -726,7 +727,7 @@ where
}

// Safe unwrap(): vote that is committed has to already have voted for some node.
let id = vote.leader_id().voted_for().unwrap();
let id = vote.leader_id().node_id_ref().cloned().unwrap();

// TODO: `is_voter()` is slow, maybe cache `current_leader`,
// e.g., only update it when membership or vote changes
Expand Down
1 change: 1 addition & 0 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ mod tests {
type NodeId = u64;
type Node = ();
type Term = u64;
type LeaderId = crate::impls::LeaderId<Self>;
type Entry = crate::Entry<TickUTConfig>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::TypeConfigExt;
use crate::vote::raft_term::RaftTerm;
use crate::vote::RaftLeaderId;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
Expand Down Expand Up @@ -209,7 +210,7 @@ where C: RaftTypeConfig
/// Start to elect this node as leader
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) fn elect(&mut self) {
let new_term = self.state.vote.leader_id().term.next();
let new_term = self.state.vote.leader_id().term().next();
let new_vote = Vote::new(new_term, self.config.id.clone());

let candidate = self.new_candidate(new_vote.clone());
Expand Down
5 changes: 3 additions & 2 deletions openraft/src/engine/handler/establish_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::proposer::Candidate;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::proposer::LeaderState;
use crate::vote::RaftLeaderId;
use crate::RaftTypeConfig;

/// Establish a leader for the Engine, when Candidate finishes voting stage.
Expand All @@ -24,8 +25,8 @@ where C: RaftTypeConfig
let vote = candidate.vote_ref().clone();

debug_assert_eq!(
vote.leader_id().voted_for(),
Some(self.config.id.clone()),
vote.leader_id().node_id_ref(),
Some(&self.config.id),
"it can only commit its own vote"
);

Expand Down
12 changes: 2 additions & 10 deletions openraft/src/engine/handler/leader_handler/append_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::CommittedLeaderId;
use crate::EffectiveMembership;
use crate::Entry;
use crate::LogId;
use crate::Membership;
use crate::MembershipState;
use crate::Vote;
Expand Down Expand Up @@ -131,10 +129,7 @@ fn test_leader_append_entries_normal() -> anyhow::Result<()> {
],
eng.state.log_ids.key_log_ids()
);
assert_eq!(
Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)),
eng.state.last_log_id()
);
assert_eq!(Some(&log_id(3, 1, 6)), eng.state.last_log_id());
assert_eq!(
MembershipState::new(
Arc::new(EffectiveMembership::new(Some(log_id(1, 1, 1)), m01())),
Expand Down Expand Up @@ -258,10 +253,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> {
],
eng.state.log_ids.key_log_ids()
);
assert_eq!(
Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)),
eng.state.last_log_id()
);
assert_eq!(Some(&log_id(3, 1, 6)), eng.state.last_log_id());
assert_eq!(
MembershipState::new(
Arc::new(EffectiveMembership::new(Some(log_id(2, 1, 3)), m1())),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::engine::testing::UTConfig;
use crate::engine::Engine;
use crate::engine::LogIdList;
use crate::CommittedLeaderId;
use crate::type_config::alias::LeaderIdOf;
use crate::vote::RaftLeaderIdExt;
use crate::LogId;

fn log_id(term: u64, index: u64) -> LogId<UTConfig> {
LogId {
leader_id: CommittedLeaderId::new(term, 0),
leader_id: LeaderIdOf::<UTConfig>::new_committed(term, 0),
index,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ use crate::progress::Progress;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::CommittedLeaderId;
use crate::EffectiveMembership;
use crate::LogId;
use crate::Membership;
use crate::MembershipState;
use crate::Vote;
Expand Down Expand Up @@ -110,11 +108,7 @@ fn test_leader_append_membership_update_learner_process() -> anyhow::Result<()>
// learner or vice versa.

let mut eng = eng();
eng.state.log_ids = LogIdList::new([
LogId::new(CommittedLeaderId::new(0, 0), 0),
log_id(1, 1, 1),
log_id(5, 1, 10),
]);
eng.state.log_ids = LogIdList::new([log_id(0, 0, 0), log_id(1, 1, 1), log_id(5, 1, 10)]);

eng.state
.membership_state
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::proposer::LeaderState;
use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::type_config::TypeConfigExt;
use crate::vote::RaftLeaderId;
use crate::LogId;
use crate::OptionalSend;
use crate::RaftState;
Expand Down Expand Up @@ -220,7 +221,7 @@ where C: RaftTypeConfig
/// This node then becomes raft-follower or raft-learner.
pub(crate) fn become_following(&mut self) {
debug_assert!(
self.state.vote_ref().leader_id().voted_for().as_ref() != Some(&self.config.id)
self.state.vote_ref().leader_id().node_id_ref() != Some(&self.config.id)
|| !self.state.membership_state.effective().membership().is_voter(&self.config.id),
"It must hold: vote is not mine, or I am not a voter(leader just left the cluster)"
);
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ where N: Node + Ord
type Node = N;
type Entry = crate::Entry<Self>;
type Term = u64;
type LeaderId = crate::impls::LeaderId<Self>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder = crate::impls::OneshotResponder<Self>;
Expand Down
4 changes: 1 addition & 3 deletions openraft/src/engine/tests/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use crate::raft::VoteRequest;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::CommittedLeaderId;
use crate::EffectiveMembership;
use crate::LogId;
use crate::Membership;
use crate::Vote;

Expand All @@ -30,7 +28,7 @@ fn m12() -> Membership<UTConfig> {

fn eng() -> Engine<UTConfig> {
let mut eng = Engine::testing_default(0);
eng.state.log_ids = LogIdList::new([LogId::new(CommittedLeaderId::new(0, 0), 0)]);
eng.state.log_ids = LogIdList::new([log_id(0, 0, 0)]);
eng.state.enable_validation(false); // Disable validation for incomplete state
eng
}
Expand Down
4 changes: 1 addition & 3 deletions openraft/src/engine/tests/handle_vote_resp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ use crate::replication::request::Replicate;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::CommittedLeaderId;
use crate::EffectiveMembership;
use crate::Entry;
use crate::LogId;
use crate::Membership;
use crate::Vote;

Expand All @@ -39,7 +37,7 @@ fn eng() -> Engine<UTConfig> {
let mut eng = Engine::testing_default(0);
eng.state.enable_validation(false); // Disable validation for incomplete state

eng.state.log_ids = LogIdList::new([LogId::new(CommittedLeaderId::new(0, 0), 0)]);
eng.state.log_ids = LogIdList::new([log_id(0, 0, 0)]);
eng
}

Expand Down
16 changes: 3 additions & 13 deletions openraft/src/engine/tests/initialize_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use crate::raft_state::LogStateReader;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::CommittedLeaderId;
use crate::Entry;
use crate::LogId;
use crate::Membership;
Expand All @@ -33,10 +32,7 @@ fn test_initialize_single_node() -> anyhow::Result<()> {
eng
};

let log_id0 = LogId {
leader_id: CommittedLeaderId::new(0, 0),
index: 0,
};
let log_id0 = log_id(0, 0, 0);

let m1 = || Membership::<UTConfig>::new_with_defaults(vec![btreeset! {1}], []);
let entry = Entry::<UTConfig>::new_membership(LogId::default(), m1());
Expand Down Expand Up @@ -86,10 +82,7 @@ fn test_initialize() -> anyhow::Result<()> {
eng
};

let log_id0 = LogId {
leader_id: CommittedLeaderId::new(0, 0),
index: 0,
};
let log_id0 = log_id(0, 0, 0);

let m12 = || Membership::<UTConfig>::new_with_defaults(vec![btreeset! {1,2}], []);
let entry = || Entry::<UTConfig>::new_membership(LogId::default(), m12());
Expand Down Expand Up @@ -122,10 +115,7 @@ fn test_initialize() -> anyhow::Result<()> {
Command::SendVote {
vote_req: VoteRequest {
vote: Vote::new(1, 1),
last_log_id: Some(LogId {
leader_id: CommittedLeaderId::new(0, 0),
index: 0,
},),
last_log_id: Some(log_id(0, 0, 0))
},
},
],
Expand Down
4 changes: 1 addition & 3 deletions openraft/src/engine/tests/trigger_purge_log_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use crate::storage::SnapshotMeta;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::CommittedLeaderId;
use crate::EffectiveMembership;
use crate::LogId;
use crate::Membership;
use crate::MembershipState;
use crate::StoredMembership;
Expand All @@ -33,7 +31,7 @@ fn eng() -> Engine<UTConfig> {
EffectiveMembership::new_arc(Some(log_id(1, 0, 1)), m12()),
);

eng.state.log_ids = LogIdList::new([LogId::new(CommittedLeaderId::new(0, 0), 0)]);
eng.state.log_ids = LogIdList::new([log_id(0, 0, 0)]);
eng
}

Expand Down
13 changes: 13 additions & 0 deletions openraft/src/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,16 @@ pub use crate::node::EmptyNode;
pub use crate::raft::responder::impls::OneshotResponder;
#[cfg(feature = "tokio-rt")]
pub use crate::type_config::async_runtime::tokio_impls::TokioRuntime;

#[cfg(not(feature = "single-term-leader"))]
pub mod leader_id {
pub use crate::vote::leader_id::leader_id_adv::CommittedLeaderId;
pub use crate::vote::leader_id::leader_id_adv::LeaderId;
}
#[cfg(feature = "single-term-leader")]
pub mod leader_id {
pub use crate::vote::leader_id::leader_id_std::CommittedLeaderId;
pub use crate::vote::leader_id::leader_id_std::LeaderId;
}

pub use leader_id::LeaderId;
10 changes: 5 additions & 5 deletions openraft/src/log_id/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use log_id_option_ext::LogIdOptionExt;
pub use log_index_option_ext::LogIndexOptionExt;
pub use raft_log_id::RaftLogId;

use crate::CommittedLeaderId;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::RaftTypeConfig;

/// The identity of a raft log.
Expand All @@ -25,7 +25,7 @@ pub struct LogId<C>
where C: RaftTypeConfig
{
/// The id of the leader that proposed this log
pub leader_id: CommittedLeaderId<C>,
pub leader_id: CommittedLeaderIdOf<C>,
/// The index of a log in the storage.
///
/// Log index is a consecutive integer.
Expand All @@ -35,7 +35,7 @@ where C: RaftTypeConfig
impl<C> Copy for LogId<C>
where
C: RaftTypeConfig,
C::NodeId: Copy,
CommittedLeaderIdOf<C>: Copy,
{
}

Expand Down Expand Up @@ -63,12 +63,12 @@ impl<C> LogId<C>
where C: RaftTypeConfig
{
/// Creates a log id proposed by a committed leader with `leader_id` at the given index.
pub fn new(leader_id: CommittedLeaderId<C>, index: u64) -> Self {
pub fn new(leader_id: CommittedLeaderIdOf<C>, index: u64) -> Self {
LogId { leader_id, index }
}

/// Returns the leader id that proposed this log.
pub fn committed_leader_id(&self) -> &CommittedLeaderId<C> {
pub fn committed_leader_id(&self) -> &CommittedLeaderIdOf<C> {
&self.leader_id
}
}
4 changes: 2 additions & 2 deletions openraft/src/log_id/raft_log_id.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::CommittedLeaderId;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::LogId;
use crate::RaftTypeConfig;

Expand All @@ -12,7 +12,7 @@ where C: RaftTypeConfig
/// For example, a leader id in standard raft is `(term, node_id)`, but a log id does not have
/// to store the `node_id`, because in standard raft there is at most one leader that can be
/// established.
fn leader_id(&self) -> &CommittedLeaderId<C> {
fn leader_id(&self) -> &CommittedLeaderIdOf<C> {
self.get_log_id().committed_leader_id()
}

Expand Down
10 changes: 4 additions & 6 deletions openraft/src/log_id_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::fmt::Formatter;
use validit::Validate;

use crate::display_ext::DisplayOptionExt;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::RaftTypeConfig;
Expand All @@ -29,7 +30,7 @@ where C: RaftTypeConfig
impl<C> Copy for LogIdRange<C>
where
C: RaftTypeConfig,
C::NodeId: Copy,
CommittedLeaderIdOf<C>: Copy,
{
}

Expand Down Expand Up @@ -69,14 +70,11 @@ mod tests {

use crate::engine::testing::UTConfig;
use crate::log_id_range::LogIdRange;
use crate::CommittedLeaderId;
use crate::testing;
use crate::LogId;

fn log_id(index: u64) -> LogId<UTConfig> {
LogId {
leader_id: CommittedLeaderId::new(1, 1),
index,
}
testing::log_id(1, 1, index)
}

#[test]
Expand Down
Loading

0 comments on commit b230bc2

Please sign in to comment.