From 91ac452701758bd4520111a2fdac716ea96b435a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 31 Dec 2024 09:40:44 +0800 Subject: [PATCH] Refactor: update gRPC example to adapt to `LeaderId` changes - Update the `raft-kv-memstore-grpc` example to use the protobuf-defined `LeaderId`. - Automatically implement `CommittedLeaderId` for all types. - Add `openraft::vote::LeaderIdCompare` to provide comparison functions for both single-leader-per-term and multi-leader-per-term implementations. --- examples/raft-kv-memstore-grpc/build.rs | 7 + .../proto/internal_service.proto | 4 +- .../src/grpc/management_service.rs | 6 +- examples/raft-kv-memstore-grpc/src/lib.rs | 79 +++++------ .../raft-kv-memstore-grpc/src/network/mod.rs | 4 +- .../src/pb_impl/impl_leader_id.rs | 51 ++++++++ .../raft-kv-memstore-grpc/src/pb_impl/mod.rs | 3 + examples/utils/declare_types.rs | 2 + openraft/src/vote/leader_id/leader_id_adv.rs | 9 +- openraft/src/vote/leader_id/leader_id_cmp.rs | 123 ++++++++++++++++++ openraft/src/vote/leader_id/leader_id_std.rs | 28 +--- openraft/src/vote/leader_id/mod.rs | 1 + .../leader_id/raft_committed_leader_id.rs | 7 + openraft/src/vote/mod.rs | 1 + 14 files changed, 244 insertions(+), 81 deletions(-) create mode 100644 examples/raft-kv-memstore-grpc/src/pb_impl/impl_leader_id.rs create mode 100644 examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs create mode 100644 openraft/src/vote/leader_id/leader_id_cmp.rs diff --git a/examples/raft-kv-memstore-grpc/build.rs b/examples/raft-kv-memstore-grpc/build.rs index e93e467ed..dcaa873a4 100644 --- a/examples/raft-kv-memstore-grpc/build.rs +++ b/examples/raft-kv-memstore-grpc/build.rs @@ -7,6 +7,9 @@ fn main() -> Result<(), Box> { "proto/management_service.proto", "proto/api_service.proto", ]; + + // TODO: remove serde + tonic_build::configure() .type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]") .type_attribute( @@ -17,6 +20,10 @@ fn main() -> Result<(), Box> { "openraftpb.Response", "#[derive(Eq, serde::Serialize, serde::Deserialize)]", ) + .type_attribute( + "openraftpb.LeaderId", + "#[derive(Eq, serde::Serialize, serde::Deserialize)]", + ) .compile_protos_with_config(config, &proto_files, &["proto"])?; Ok(()) } diff --git a/examples/raft-kv-memstore-grpc/proto/internal_service.proto b/examples/raft-kv-memstore-grpc/proto/internal_service.proto index 181b447de..9414a7c16 100644 --- a/examples/raft-kv-memstore-grpc/proto/internal_service.proto +++ b/examples/raft-kv-memstore-grpc/proto/internal_service.proto @@ -15,8 +15,8 @@ message Vote { // LogId represents the log identifier in Raft message LogId { - uint64 index = 1; - LeaderId leader_id = 2; + uint64 term = 1; + uint64 index = 2; } // VoteRequest represents a request for votes during leader election diff --git a/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs index efb4bc757..fa054d692 100644 --- a/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs +++ b/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs @@ -5,6 +5,7 @@ use tonic::Response; use tonic::Status; use tracing::debug; +use crate::pb; use crate::protobuf::management_service_server::ManagementService; use crate::protobuf::AddLearnerRequest; use crate::protobuf::ChangeMembershipRequest; @@ -12,7 +13,6 @@ use crate::protobuf::InitRequest; use crate::protobuf::RaftReplyString; use crate::protobuf::RaftRequestString; use crate::typ::*; -use crate::Node; /// Management service implementation for Raft cluster administration. /// Handles cluster initialization, membership changes, and metrics collection. @@ -62,11 +62,11 @@ impl ManagementService for ManagementServiceImpl { let req = request.into_inner(); // Convert nodes into required format - let nodes_map: BTreeMap = req + let nodes_map: BTreeMap = req .nodes .into_iter() .map(|node| { - (node.node_id, Node { + (node.node_id, pb::Node { rpc_addr: node.rpc_addr, node_id: node.node_id, }) diff --git a/examples/raft-kv-memstore-grpc/src/lib.rs b/examples/raft-kv-memstore-grpc/src/lib.rs index cbd41599a..0008f1a4f 100644 --- a/examples/raft-kv-memstore-grpc/src/lib.rs +++ b/examples/raft-kv-memstore-grpc/src/lib.rs @@ -1,8 +1,6 @@ #![allow(clippy::uninlined_format_args)] -use crate::protobuf::Node; -use crate::protobuf::Response; -use crate::protobuf::SetRequest; +use crate::protobuf as pb; use crate::store::StateMachineData; use crate::typ::*; @@ -12,14 +10,17 @@ pub mod store; #[cfg(test)] mod test; +mod pb_impl; + pub type NodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. pub TypeConfig: - D = SetRequest, - R = Response, - Node = Node, + D = pb::SetRequest, + R = pb::Response, + LeaderId = pb::LeaderId, + Node = pb::Node, SnapshotData = StateMachineData, ); @@ -33,59 +34,43 @@ pub mod protobuf { #[path = "../../utils/declare_types.rs"] pub mod typ; -impl From for LeaderId { - fn from(proto_leader_id: protobuf::LeaderId) -> Self { - LeaderId::new(proto_leader_id.term, proto_leader_id.node_id) - } -} - -impl From for typ::Vote { - fn from(proto_vote: protobuf::Vote) -> Self { - let leader_id: LeaderId = proto_vote.leader_id.unwrap().into(); +impl From for Vote { + fn from(proto_vote: pb::Vote) -> Self { + let leader_id: LeaderId = proto_vote.leader_id.unwrap(); if proto_vote.committed { - typ::Vote::new_committed(leader_id.term, leader_id.node_id) + Vote::new_committed(leader_id.term, leader_id.node_id) } else { - typ::Vote::new(leader_id.term, leader_id.node_id) + Vote::new(leader_id.term, leader_id.node_id) } } } -impl From for LogId { - fn from(proto_log_id: protobuf::LogId) -> Self { - let leader_id: LeaderId = proto_log_id.leader_id.unwrap().into(); - LogId::new(leader_id, proto_log_id.index) +impl From for LogId { + fn from(proto_log_id: pb::LogId) -> Self { + LogId::new(proto_log_id.term, proto_log_id.index) } } -impl From for VoteRequest { - fn from(proto_vote_req: protobuf::VoteRequest) -> Self { - let vote: typ::Vote = proto_vote_req.vote.unwrap().into(); +impl From for VoteRequest { + fn from(proto_vote_req: pb::VoteRequest) -> Self { + let vote: Vote = proto_vote_req.vote.unwrap().into(); let last_log_id = proto_vote_req.last_log_id.map(|log_id| log_id.into()); VoteRequest::new(vote, last_log_id) } } -impl From for VoteResponse { - fn from(proto_vote_resp: protobuf::VoteResponse) -> Self { - let vote: typ::Vote = proto_vote_resp.vote.unwrap().into(); +impl From for VoteResponse { + fn from(proto_vote_resp: pb::VoteResponse) -> Self { + let vote: Vote = proto_vote_resp.vote.unwrap().into(); let last_log_id = proto_vote_resp.last_log_id.map(|log_id| log_id.into()); VoteResponse::new(vote, last_log_id, proto_vote_resp.vote_granted) } } -impl From for protobuf::LeaderId { - fn from(leader_id: LeaderId) -> Self { - protobuf::LeaderId { - term: leader_id.term, - node_id: leader_id.node_id, - } - } -} - -impl From for protobuf::Vote { - fn from(vote: typ::Vote) -> Self { - protobuf::Vote { - leader_id: Some(protobuf::LeaderId { +impl From for pb::Vote { + fn from(vote: Vote) -> Self { + pb::Vote { + leader_id: Some(pb::LeaderId { term: vote.leader_id().term, node_id: vote.leader_id().node_id, }), @@ -93,27 +78,27 @@ impl From for protobuf::Vote { } } } -impl From for protobuf::LogId { +impl From for pb::LogId { fn from(log_id: LogId) -> Self { - protobuf::LogId { + pb::LogId { + term: log_id.leader_id, index: log_id.index, - leader_id: Some(log_id.leader_id.into()), } } } -impl From for protobuf::VoteRequest { +impl From for pb::VoteRequest { fn from(vote_req: VoteRequest) -> Self { - protobuf::VoteRequest { + pb::VoteRequest { vote: Some(vote_req.vote.into()), last_log_id: vote_req.last_log_id.map(|log_id| log_id.into()), } } } -impl From for protobuf::VoteResponse { +impl From for pb::VoteResponse { fn from(vote_resp: VoteResponse) -> Self { - protobuf::VoteResponse { + pb::VoteResponse { vote: Some(vote_resp.vote.into()), vote_granted: vote_resp.vote_granted, last_log_id: vote_resp.last_log_id.map(|log_id| log_id.into()), diff --git a/examples/raft-kv-memstore-grpc/src/network/mod.rs b/examples/raft-kv-memstore-grpc/src/network/mod.rs index dfee11803..b8798951c 100644 --- a/examples/raft-kv-memstore-grpc/src/network/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/network/mod.rs @@ -7,6 +7,7 @@ use openraft::network::RPCOption; use openraft::RaftNetworkFactory; use tonic::transport::Channel; +use crate::protobuf as pb; use crate::protobuf::internal_service_client::InternalServiceClient; use crate::protobuf::RaftRequestBytes; use crate::protobuf::SnapshotRequest; @@ -14,7 +15,6 @@ use crate::protobuf::VoteRequest as PbVoteRequest; use crate::protobuf::VoteResponse as PbVoteResponse; use crate::typ::RPCError; use crate::typ::*; -use crate::Node; use crate::NodeId; use crate::TypeConfig; @@ -38,7 +38,7 @@ impl RaftNetworkFactory for Network { /// Represents an active network connection to a remote Raft node. /// Handles serialization and deserialization of Raft messages over gRPC. pub struct NetworkConnection { - target_node: Node, + target_node: pb::Node, } impl NetworkConnection { diff --git a/examples/raft-kv-memstore-grpc/src/pb_impl/impl_leader_id.rs b/examples/raft-kv-memstore-grpc/src/pb_impl/impl_leader_id.rs new file mode 100644 index 000000000..635006ae5 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/pb_impl/impl_leader_id.rs @@ -0,0 +1,51 @@ +//! Implement [`RaftLeaderId`] for protobuf defined LeaderId, so that it can be used in OpenRaft + +use std::cmp::Ordering; +use std::fmt; + +use openraft::vote::LeaderIdCompare; +use openraft::vote::RaftLeaderId; + +use crate::protobuf as pb; +use crate::TypeConfig; + +/// Implements PartialOrd for LeaderId to enforce the standard Raft behavior of at most one leader +/// per term. +/// +/// In standard Raft, each term can have at most one leader. This is enforced by making leader IDs +/// with the same term incomparable (returning None), unless they refer to the same node. +/// +/// This differs from the [`PartialOrd`] default implementation which would allow multiple leaders +/// in the same term by comparing node IDs. +impl PartialOrd for pb::LeaderId { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + LeaderIdCompare::std(self, other) + } +} + +impl fmt::Display for pb::LeaderId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "T{}-N{}", self.term, self.node_id) + } +} + +impl RaftLeaderId for pb::LeaderId { + type Committed = u64; + + fn new(term: u64, node_id: u64) -> Self { + Self { term, node_id } + } + + fn term(&self) -> u64 { + self.term + } + + fn node_id_ref(&self) -> Option<&u64> { + Some(&self.node_id) + } + + fn to_committed(&self) -> Self::Committed { + self.term + } +} diff --git a/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs b/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs new file mode 100644 index 000000000..df953a355 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs @@ -0,0 +1,3 @@ +//! Implements traits for protobuf types + +mod impl_leader_id; diff --git a/examples/utils/declare_types.rs b/examples/utils/declare_types.rs index d7c6ec488..12e91b1c5 100644 --- a/examples/utils/declare_types.rs +++ b/examples/utils/declare_types.rs @@ -10,6 +10,8 @@ pub type Entry = openraft::Entry; pub type EntryPayload = openraft::EntryPayload; pub type StoredMembership = openraft::StoredMembership; +pub type Node = ::Node; + pub type LogState = openraft::storage::LogState; pub type SnapshotMeta = openraft::SnapshotMeta; diff --git a/openraft/src/vote/leader_id/leader_id_adv.rs b/openraft/src/vote/leader_id/leader_id_adv.rs index 3ad7c3b5d..4ef45b9a1 100644 --- a/openraft/src/vote/leader_id/leader_id_adv.rs +++ b/openraft/src/vote/leader_id/leader_id_adv.rs @@ -1,6 +1,7 @@ +//! [`RaftLeaderId`] implementation that allows multiple leaders per term. + use std::fmt; -use crate::vote::RaftCommittedLeaderId; use crate::vote::RaftLeaderId; use crate::RaftTypeConfig; @@ -42,7 +43,7 @@ where C: RaftTypeConfig pub type CommittedLeaderId = LeaderId; impl RaftLeaderId for LeaderId -where C: RaftTypeConfig +where C: RaftTypeConfig { type Committed = Self; @@ -63,8 +64,6 @@ where C: RaftTypeConfig } } -impl RaftCommittedLeaderId for LeaderId where C: RaftTypeConfig {} - #[cfg(test)] mod tests { use super::LeaderId; @@ -89,7 +88,7 @@ mod tests { } #[test] - fn test_leader_id_partial_order() -> anyhow::Result<()> { + fn test_adv_leader_id_partial_order() -> anyhow::Result<()> { #[allow(clippy::redundant_closure)] let lid = |term, node_id| LeaderId::::new(term, node_id); diff --git a/openraft/src/vote/leader_id/leader_id_cmp.rs b/openraft/src/vote/leader_id/leader_id_cmp.rs new file mode 100644 index 000000000..0b8579c78 --- /dev/null +++ b/openraft/src/vote/leader_id/leader_id_cmp.rs @@ -0,0 +1,123 @@ +use std::cmp::Ordering; +use std::marker::PhantomData; + +use crate::vote::RaftLeaderId; +use crate::RaftTypeConfig; + +/// Provide comparison functions for [`RaftLeaderId`] implementations. +pub struct LeaderIdCompare(PhantomData); + +impl LeaderIdCompare +where C: RaftTypeConfig +{ + /// Implements [`PartialOrd`] for LeaderId to enforce the standard Raft behavior of at most one + /// leader per term. + /// + /// In standard Raft, each term can have at most one leader. This is enforced by making leader + /// IDs with the same term incomparable (returning None), unless they refer to the same + /// node. + pub fn std(a: &LID, b: &LID) -> Option + where LID: RaftLeaderId { + match a.term().cmp(&b.term()) { + Ordering::Equal => match (a.node_id_ref(), b.node_id_ref()) { + (None, None) => Some(Ordering::Equal), + (Some(_), None) => Some(Ordering::Greater), + (None, Some(_)) => Some(Ordering::Less), + (Some(a), Some(b)) => { + if a == b { + Some(Ordering::Equal) + } else { + None + } + } + }, + cmp => Some(cmp), + } + } + + /// Implements [`PartialOrd`] for LeaderId to allow multiple leaders per term. + pub fn adv(a: &LID, b: &LID) -> Option + where LID: RaftLeaderId { + let res = (a.term(), a.node_id_ref()).cmp(&(b.term(), b.node_id_ref())); + Some(res) + } +} + +#[cfg(test)] +mod tests { + use std::cmp::Ordering; + + use crate::engine::testing::UTConfig; + use crate::vote::RaftLeaderId; + + #[derive(Debug, PartialEq, Eq, Default, Clone, PartialOrd, derive_more::Display)] + #[display("T{}-N{:?}", _0, _1)] + #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] + struct LeaderId(u64, Option); + + impl RaftLeaderId for LeaderId { + type Committed = u64; + + fn new(term: u64, node_id: u64) -> Self { + Self(term, Some(node_id)) + } + + fn term(&self) -> u64 { + self.0 + } + + fn node_id_ref(&self) -> Option<&u64> { + self.1.as_ref() + } + + fn to_committed(&self) -> Self::Committed { + self.0 + } + } + + #[test] + fn test_std_cmp() { + use Ordering::*; + + use super::LeaderIdCompare as Cmp; + + let lid = |term, node_id| LeaderId(term, Some(node_id)); + let lid_none = |term| LeaderId(term, None); + + // Compare term first + assert_eq!(Cmp::std(&lid(2, 2), &lid(1, 2)), Some(Greater)); + assert_eq!(Cmp::std(&lid(1, 2), &lid(2, 2)), Some(Less)); + + // Equal term, Some > None + assert_eq!(Cmp::std(&lid(2, 2), &lid_none(2)), Some(Greater)); + assert_eq!(Cmp::std(&lid_none(2), &lid(2, 2)), Some(Less)); + + // Equal + assert_eq!(Cmp::std(&lid(2, 2), &lid(2, 2)), Some(Equal)); + + // Incomparable + assert_eq!(Cmp::std(&lid(2, 2), &lid(2, 1)), None); + assert_eq!(Cmp::std(&lid(2, 1), &lid(2, 2)), None); + assert_eq!(Cmp::std(&lid(2, 2), &lid(2, 3)), None); + } + + #[test] + fn test_adv_cmp() { + use Ordering::*; + + use super::LeaderIdCompare as Cmp; + + let lid = |term, node_id| LeaderId(term, Some(node_id)); + + // Compare term first + assert_eq!(Cmp::adv(&lid(2, 2), &lid(1, 2)), Some(Greater)); + assert_eq!(Cmp::adv(&lid(1, 2), &lid(2, 2)), Some(Less)); + + // Equal term + assert_eq!(Cmp::adv(&lid(2, 2), &lid(2, 1)), Some(Greater)); + assert_eq!(Cmp::adv(&lid(2, 1), &lid(2, 2)), Some(Less)); + + // Equal term, node_id + assert_eq!(Cmp::adv(&lid(2, 2), &lid(2, 2)), Some(Equal)); + } +} diff --git a/openraft/src/vote/leader_id/leader_id_std.rs b/openraft/src/vote/leader_id/leader_id_std.rs index 9ed779856..d8adea127 100644 --- a/openraft/src/vote/leader_id/leader_id_std.rs +++ b/openraft/src/vote/leader_id/leader_id_std.rs @@ -1,9 +1,12 @@ +//! [`RaftLeaderId`] implementation that enforces standard Raft behavior of at most one leader per +//! term. + use std::cmp::Ordering; use std::fmt; use std::marker::PhantomData; use crate::display_ext::DisplayOptionExt; -use crate::vote::RaftCommittedLeaderId; +use crate::vote::LeaderIdCompare; use crate::vote::RaftLeaderId; use crate::RaftTypeConfig; @@ -28,24 +31,7 @@ where C: RaftTypeConfig { #[inline] fn partial_cmp(&self, other: &Self) -> Option { - match PartialOrd::partial_cmp(&self.term, &other.term) { - Some(Ordering::Equal) => { - // - match (&self.voted_for, &other.voted_for) { - (None, None) => Some(Ordering::Equal), - (Some(_), None) => Some(Ordering::Greater), - (None, Some(_)) => Some(Ordering::Less), - (Some(a), Some(b)) => { - if a == b { - Some(Ordering::Equal) - } else { - None - } - } - } - } - cmp => cmp, - } + LeaderIdCompare::::std(self, other) } } @@ -114,8 +100,6 @@ where C: RaftTypeConfig } } -impl RaftCommittedLeaderId for CommittedLeaderId where C: RaftTypeConfig {} - #[cfg(test)] #[allow(clippy::nonminimal_bool)] mod tests { @@ -140,7 +124,7 @@ mod tests { #[test] #[allow(clippy::neg_cmp_op_on_partial_ord)] - fn test_leader_id_partial_order() -> anyhow::Result<()> { + fn test_std_leader_id_partial_order() -> anyhow::Result<()> { #[allow(clippy::redundant_closure)] let lid = |term, node_id| LeaderId::::new(term, node_id); diff --git a/openraft/src/vote/leader_id/mod.rs b/openraft/src/vote/leader_id/mod.rs index c6e6e3999..3dc606183 100644 --- a/openraft/src/vote/leader_id/mod.rs +++ b/openraft/src/vote/leader_id/mod.rs @@ -1,6 +1,7 @@ pub mod leader_id_adv; pub mod leader_id_std; +pub(crate) mod leader_id_cmp; pub(crate) mod raft_committed_leader_id; pub(crate) mod raft_leader_id; diff --git a/openraft/src/vote/leader_id/raft_committed_leader_id.rs b/openraft/src/vote/leader_id/raft_committed_leader_id.rs index d597f861a..3f84f01c4 100644 --- a/openraft/src/vote/leader_id/raft_committed_leader_id.rs +++ b/openraft/src/vote/leader_id/raft_committed_leader_id.rs @@ -43,3 +43,10 @@ where Self: OptionalFeatures + Ord + Clone + Debug + Display + Default + 'static, { } + +impl RaftCommittedLeaderId for T +where + C: RaftTypeConfig, + T: OptionalFeatures + Ord + Clone + Debug + Display + Default + 'static, +{ +} diff --git a/openraft/src/vote/mod.rs b/openraft/src/vote/mod.rs index da5e1619c..0d094f90d 100644 --- a/openraft/src/vote/mod.rs +++ b/openraft/src/vote/mod.rs @@ -16,5 +16,6 @@ pub use leader_id::raft_leader_id::RaftLeaderIdExt; pub use raft_term::RaftTerm; pub use self::leader_id::leader_id_adv; +pub use self::leader_id::leader_id_cmp::LeaderIdCompare; pub use self::leader_id::leader_id_std; pub use self::vote::Vote;