diff --git a/examples/raft-kv-memstore-grpc/src/bin/main.rs b/examples/raft-kv-memstore-grpc/src/bin/main.rs index 4aeca2df3..00945a1d6 100644 --- a/examples/raft-kv-memstore-grpc/src/bin/main.rs +++ b/examples/raft-kv-memstore-grpc/src/bin/main.rs @@ -9,8 +9,8 @@ use raft_kv_memstore_grpc::network::Network; use raft_kv_memstore_grpc::protobuf::api_service_server::ApiServiceServer; use raft_kv_memstore_grpc::protobuf::internal_service_server::InternalServiceServer; use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer; +use raft_kv_memstore_grpc::typ::Raft; use raft_kv_memstore_grpc::LogStore; -use raft_kv_memstore_grpc::Raft; use raft_kv_memstore_grpc::StateMachineStore; use tonic::transport::Server; use tracing::info; diff --git a/examples/raft-kv-memstore-grpc/src/grpc/api_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/api_service.rs index 1070d231d..1d982eee1 100644 --- a/examples/raft-kv-memstore-grpc/src/grpc/api_service.rs +++ b/examples/raft-kv-memstore-grpc/src/grpc/api_service.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use openraft::Raft; use tonic::Request; use tonic::Response; use tonic::Status; @@ -11,7 +10,7 @@ use crate::protobuf::GetRequest; use crate::protobuf::Response as PbResponse; use crate::protobuf::SetRequest; use crate::store::StateMachineStore; -use crate::TypeConfig; +use crate::typ::*; /// External API service implementation providing key-value store operations. /// This service handles client requests for getting and setting values in the distributed store. @@ -26,7 +25,7 @@ use crate::TypeConfig; /// before processing them through the Raft consensus protocol. pub struct ApiServiceImpl { /// The Raft node instance for consensus operations - raft_node: Raft, + raft_node: Raft, /// The state machine store for direct reads state_machine_store: Arc, } @@ -37,7 +36,7 @@ impl ApiServiceImpl { /// # Arguments /// * `raft_node` - The Raft node instance this service will use /// * `state_machine_store` - The state machine store for reading data - pub fn new(raft_node: Raft, state_machine_store: Arc) -> Self { + pub fn new(raft_node: Raft, state_machine_store: Arc) -> Self { ApiServiceImpl { raft_node, state_machine_store, diff --git a/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs index 2b568ecb1..84c2af345 100644 --- a/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs +++ b/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs @@ -1,7 +1,6 @@ use bincode::deserialize; use bincode::serialize; use futures::StreamExt; -use openraft::Raft; use openraft::Snapshot; use tonic::Request; use tonic::Response; @@ -16,7 +15,7 @@ use crate::protobuf::SnapshotRequest; use crate::protobuf::VoteRequest; use crate::protobuf::VoteResponse; use crate::store::StateMachineData; -use crate::TypeConfig; +use crate::typ::*; /// Internal gRPC service implementation for Raft protocol communications. /// This service handles the core Raft consensus protocol operations between cluster nodes. @@ -31,7 +30,7 @@ use crate::TypeConfig; /// exposed to other trusted Raft cluster nodes, never to external clients. pub struct InternalServiceImpl { /// The local Raft node instance that this service operates on - raft_node: Raft, + raft_node: Raft, } impl InternalServiceImpl { @@ -39,7 +38,7 @@ impl InternalServiceImpl { /// /// # Arguments /// * `raft_node` - The Raft node instance this service will operate on - pub fn new(raft_node: Raft) -> Self { + pub fn new(raft_node: Raft) -> Self { InternalServiceImpl { raft_node } } diff --git a/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs index ae6ffcb0a..efb4bc757 100644 --- a/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs +++ b/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs @@ -1,6 +1,5 @@ use std::collections::BTreeMap; -use openraft::Raft; use tonic::Request; use tonic::Response; use tonic::Status; @@ -12,8 +11,8 @@ use crate::protobuf::ChangeMembershipRequest; use crate::protobuf::InitRequest; use crate::protobuf::RaftReplyString; use crate::protobuf::RaftRequestString; +use crate::typ::*; use crate::Node; -use crate::TypeConfig; /// Management service implementation for Raft cluster administration. /// Handles cluster initialization, membership changes, and metrics collection. @@ -24,7 +23,7 @@ use crate::TypeConfig; /// - Changing cluster membership /// - Collecting metrics pub struct ManagementServiceImpl { - raft_node: Raft, + raft_node: Raft, } impl ManagementServiceImpl { @@ -32,7 +31,7 @@ impl ManagementServiceImpl { /// /// # Arguments /// * `raft_node` - The Raft node instance this service will manage - pub fn new(raft_node: Raft) -> Self { + pub fn new(raft_node: Raft) -> Self { ManagementServiceImpl { raft_node } } diff --git a/examples/raft-kv-memstore-grpc/src/lib.rs b/examples/raft-kv-memstore-grpc/src/lib.rs index 84a4a1d8e..cbd41599a 100644 --- a/examples/raft-kv-memstore-grpc/src/lib.rs +++ b/examples/raft-kv-memstore-grpc/src/lib.rs @@ -1,14 +1,10 @@ #![allow(clippy::uninlined_format_args)] -use openraft::raft::VoteRequest; -use openraft::raft::VoteResponse; -use openraft::LeaderId; -use openraft::LogId; - use crate::protobuf::Node; use crate::protobuf::Response; use crate::protobuf::SetRequest; use crate::store::StateMachineData; +use crate::typ::*; pub mod grpc; pub mod network; @@ -29,34 +25,15 @@ openraft::declare_raft_types!( pub type LogStore = store::LogStore; pub type StateMachineStore = store::StateMachineStore; -pub type Raft = openraft::Raft; pub mod protobuf { tonic::include_proto!("openraftpb"); } -pub mod typ { - - use crate::TypeConfig; - - pub type Vote = openraft::Vote; - pub type SnapshotMeta = openraft::SnapshotMeta; - pub type SnapshotData = ::SnapshotData; - pub type Snapshot = openraft::Snapshot; - - pub type RaftError = openraft::error::RaftError; - pub type RPCError = openraft::error::RPCError; - pub type StreamingError = openraft::error::StreamingError; - - pub type ClientWriteError = openraft::error::ClientWriteError; - pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; - pub type ForwardToLeader = openraft::error::ForwardToLeader; - pub type InitializeError = openraft::error::InitializeError; - - pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; -} +#[path = "../../utils/declare_types.rs"] +pub mod typ; -impl From for LeaderId { +impl From for LeaderId { fn from(proto_leader_id: protobuf::LeaderId) -> Self { LeaderId::new(proto_leader_id.term, proto_leader_id.node_id) } @@ -64,7 +41,7 @@ impl From for LeaderId { impl From for typ::Vote { fn from(proto_vote: protobuf::Vote) -> Self { - let leader_id: LeaderId = proto_vote.leader_id.unwrap().into(); + let leader_id: LeaderId = proto_vote.leader_id.unwrap().into(); if proto_vote.committed { typ::Vote::new_committed(leader_id.term, leader_id.node_id) } else { @@ -73,14 +50,14 @@ impl From for typ::Vote { } } -impl From for LogId { +impl From for LogId { fn from(proto_log_id: protobuf::LogId) -> Self { - let leader_id: LeaderId = proto_log_id.leader_id.unwrap().into(); + let leader_id: LeaderId = proto_log_id.leader_id.unwrap().into(); LogId::new(leader_id, proto_log_id.index) } } -impl From for VoteRequest { +impl From for VoteRequest { fn from(proto_vote_req: protobuf::VoteRequest) -> Self { let vote: typ::Vote = proto_vote_req.vote.unwrap().into(); let last_log_id = proto_vote_req.last_log_id.map(|log_id| log_id.into()); @@ -88,7 +65,7 @@ impl From for VoteRequest { } } -impl From for VoteResponse { +impl From for VoteResponse { fn from(proto_vote_resp: protobuf::VoteResponse) -> Self { let vote: typ::Vote = proto_vote_resp.vote.unwrap().into(); let last_log_id = proto_vote_resp.last_log_id.map(|log_id| log_id.into()); @@ -96,8 +73,8 @@ impl From for VoteResponse { } } -impl From> for protobuf::LeaderId { - fn from(leader_id: LeaderId) -> Self { +impl From for protobuf::LeaderId { + fn from(leader_id: LeaderId) -> Self { protobuf::LeaderId { term: leader_id.term, node_id: leader_id.node_id, @@ -116,8 +93,8 @@ impl From for protobuf::Vote { } } } -impl From> for protobuf::LogId { - fn from(log_id: LogId) -> Self { +impl From for protobuf::LogId { + fn from(log_id: LogId) -> Self { protobuf::LogId { index: log_id.index, leader_id: Some(log_id.leader_id.into()), @@ -125,8 +102,8 @@ impl From> for protobuf::LogId { } } -impl From> for protobuf::VoteRequest { - fn from(vote_req: VoteRequest) -> Self { +impl From for protobuf::VoteRequest { + fn from(vote_req: VoteRequest) -> Self { protobuf::VoteRequest { vote: Some(vote_req.vote.into()), last_log_id: vote_req.last_log_id.map(|log_id| log_id.into()), @@ -134,8 +111,8 @@ impl From> for protobuf::VoteRequest { } } -impl From> for protobuf::VoteResponse { - fn from(vote_resp: VoteResponse) -> Self { +impl From for protobuf::VoteResponse { + fn from(vote_resp: VoteResponse) -> Self { protobuf::VoteResponse { vote: Some(vote_resp.vote.into()), vote_granted: vote_resp.vote_granted, diff --git a/examples/raft-kv-memstore-grpc/src/network/mod.rs b/examples/raft-kv-memstore-grpc/src/network/mod.rs index e9ecf1ae8..dfee11803 100644 --- a/examples/raft-kv-memstore-grpc/src/network/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/network/mod.rs @@ -4,10 +4,6 @@ use openraft::error::NetworkError; use openraft::error::Unreachable; use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::raft::AppendEntriesRequest; -use openraft::raft::AppendEntriesResponse; -use openraft::raft::VoteRequest; -use openraft::raft::VoteResponse; use openraft::RaftNetworkFactory; use tonic::transport::Channel; @@ -17,6 +13,7 @@ use crate::protobuf::SnapshotRequest; use crate::protobuf::VoteRequest as PbVoteRequest; use crate::protobuf::VoteResponse as PbVoteResponse; use crate::typ::RPCError; +use crate::typ::*; use crate::Node; use crate::NodeId; use crate::TypeConfig; @@ -56,9 +53,9 @@ impl NetworkConnection { impl RaftNetworkV2 for NetworkConnection { async fn append_entries( &mut self, - req: AppendEntriesRequest, + req: AppendEntriesRequest, _option: RPCOption, - ) -> Result, RPCError> { + ) -> Result { let server_addr = self.target_node.rpc_addr.clone(); let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await { Ok(channel) => channel, @@ -78,11 +75,11 @@ impl RaftNetworkV2 for NetworkConnection { async fn full_snapshot( &mut self, - vote: openraft::Vote, - snapshot: openraft::Snapshot, + vote: Vote, + snapshot: Snapshot, _cancel: impl std::future::Future + openraft::OptionalSend + 'static, _option: RPCOption, - ) -> Result, crate::typ::StreamingError> { + ) -> Result { let server_addr = self.target_node.rpc_addr.clone(); let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await { Ok(channel) => channel, @@ -128,11 +125,7 @@ impl RaftNetworkV2 for NetworkConnection { Ok(result) } - async fn vote( - &mut self, - req: VoteRequest, - _option: RPCOption, - ) -> Result, RPCError> { + async fn vote(&mut self, req: VoteRequest, _option: RPCOption) -> Result { let server_addr = self.target_node.rpc_addr.clone(); let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await { Ok(channel) => channel, diff --git a/examples/raft-kv-memstore-grpc/src/store/mod.rs b/examples/raft-kv-memstore-grpc/src/store/mod.rs index a6792902d..d45f9ba13 100644 --- a/examples/raft-kv-memstore-grpc/src/store/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/store/mod.rs @@ -4,31 +4,24 @@ use std::sync::Arc; use std::sync::Mutex; use bincode; -use openraft::alias::SnapshotDataOf; use openraft::storage::RaftStateMachine; -use openraft::storage::Snapshot; -use openraft::Entry; use openraft::EntryPayload; -use openraft::LogId; use openraft::RaftSnapshotBuilder; -use openraft::SnapshotMeta; -use openraft::StorageError; -use openraft::StoredMembership; use serde::Deserialize; use serde::Serialize; use crate::protobuf::Response; -use crate::typ; +use crate::typ::*; use crate::TypeConfig; pub type LogStore = memstore::LogStore; #[derive(Debug)] pub struct StoredSnapshot { - pub meta: SnapshotMeta, + pub meta: SnapshotMeta, /// The data of the state machine at the time of this snapshot. - pub data: Box, + pub data: Box, } /// Data contained in the Raft state machine. @@ -38,9 +31,9 @@ pub struct StoredSnapshot { /// and value as String, but you could set any type of value that has the serialization impl. #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct StateMachineData { - pub last_applied: Option>, + pub last_applied: Option, - pub last_membership: StoredMembership, + pub last_membership: StoredMembership, /// Application data. pub data: BTreeMap, @@ -71,7 +64,7 @@ pub struct StateMachineStore { impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot(&mut self) -> Result, StorageError> { + async fn build_snapshot(&mut self) -> Result { let data; let last_applied_log; let last_membership; @@ -123,16 +116,14 @@ impl RaftSnapshotBuilder for Arc { impl RaftStateMachine for Arc { type SnapshotBuilder = Self; - async fn applied_state( - &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + async fn applied_state(&mut self) -> Result<(Option, StoredMembership), StorageError> { let state_machine = self.state_machine.lock().unwrap(); Ok((state_machine.last_applied, state_machine.last_membership.clone())) } #[tracing::instrument(level = "trace", skip(self, entries))] - async fn apply(&mut self, entries: I) -> Result, StorageError> - where I: IntoIterator> { + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator { let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator let mut sm = self.state_machine.lock().unwrap(); @@ -158,16 +149,12 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { Ok(Box::default()) } #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn install_snapshot( - &mut self, - meta: &SnapshotMeta, - snapshot: Box>, - ) -> Result<(), StorageError> { + async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: Box) -> Result<(), StorageError> { tracing::info!("install snapshot"); let new_snapshot = StoredSnapshot { @@ -189,7 +176,7 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result, StorageError> { match &*self.current_snapshot.lock().unwrap() { Some(snapshot) => { let data = snapshot.data.clone(); diff --git a/examples/raft-kv-memstore-grpc/src/test.rs b/examples/raft-kv-memstore-grpc/src/test.rs index dff18a3ec..93b1edf65 100644 --- a/examples/raft-kv-memstore-grpc/src/test.rs +++ b/examples/raft-kv-memstore-grpc/src/test.rs @@ -2,22 +2,22 @@ use std::sync::Arc; use openraft::testing::log::StoreBuilder; use openraft::testing::log::Suite; -use openraft::StorageError; use crate::store::LogStore; use crate::store::StateMachineStore; +use crate::typ::*; use crate::TypeConfig; struct MemKVStoreBuilder {} impl StoreBuilder, ()> for MemKVStoreBuilder { - async fn build(&self) -> Result<((), LogStore, Arc), StorageError> { + async fn build(&self) -> Result<((), LogStore, Arc), StorageError> { Ok(((), LogStore::default(), Arc::default())) } } #[tokio::test] -pub async fn test_mem_store() -> Result<(), StorageError> { +pub async fn test_mem_store() -> Result<(), StorageError> { Suite::test_all(MemKVStoreBuilder {}).await?; Ok(()) } diff --git a/examples/raft-kv-memstore-network-v2/src/api.rs b/examples/raft-kv-memstore-network-v2/src/api.rs index 299a5c0a1..5509b840c 100644 --- a/examples/raft-kv-memstore-network-v2/src/api.rs +++ b/examples/raft-kv-memstore-network-v2/src/api.rs @@ -3,18 +3,13 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; -use openraft::error::CheckIsLeaderError; -use openraft::error::Infallible; -use openraft::error::RaftError; use openraft::BasicNode; -use openraft::RaftMetrics; use crate::app::App; use crate::decode; use crate::encode; -use crate::typ; +use crate::typ::*; use crate::NodeId; -use crate::TypeConfig; pub async fn write(app: &mut App, req: String) -> String { let res = app.raft.client_write(decode(&req)).await; @@ -31,8 +26,7 @@ pub async fn read(app: &mut App, req: String) -> String { let state_machine = app.state_machine.state_machine.lock().unwrap(); let value = state_machine.data.get(&key).cloned(); - let res: Result>> = - Ok(value.unwrap_or_default()); + let res: Result> = Ok(value.unwrap_or_default()); res } Err(e) => Err(e), @@ -54,16 +48,12 @@ pub async fn append(app: &mut App, req: String) -> String { /// Receive a snapshot and install it. pub async fn snapshot(app: &mut App, req: String) -> String { - let (vote, snapshot_meta, snapshot_data): (typ::Vote, typ::SnapshotMeta, typ::SnapshotData) = decode(&req); - let snapshot = typ::Snapshot { + let (vote, snapshot_meta, snapshot_data): (Vote, SnapshotMeta, SnapshotData) = decode(&req); + let snapshot = Snapshot { meta: snapshot_meta, snapshot: Box::new(snapshot_data), }; - let res = app - .raft - .install_full_snapshot(vote, snapshot) - .await - .map_err(typ::RaftError::::Fatal); + let res = app.raft.install_full_snapshot(vote, snapshot).await.map_err(RaftError::::Fatal); encode(res) } @@ -100,6 +90,6 @@ pub async fn init(app: &mut App) -> String { pub async fn metrics(app: &mut App) -> String { let metrics = app.raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); + let res: Result = Ok(metrics); encode(res) } diff --git a/examples/raft-kv-memstore-network-v2/src/lib.rs b/examples/raft-kv-memstore-network-v2/src/lib.rs index 6bbda6497..d29564fca 100644 --- a/examples/raft-kv-memstore-network-v2/src/lib.rs +++ b/examples/raft-kv-memstore-network-v2/src/lib.rs @@ -33,31 +33,8 @@ openraft::declare_raft_types!( pub type LogStore = store::LogStore; pub type StateMachineStore = store::StateMachineStore; -pub mod typ { - use crate::TypeConfig; - - pub type Raft = openraft::Raft; - - pub type Vote = openraft::Vote; - pub type SnapshotMeta = openraft::SnapshotMeta; - pub type SnapshotData = ::SnapshotData; - pub type Snapshot = openraft::Snapshot; - - pub type Infallible = openraft::error::Infallible; - pub type Fatal = openraft::error::Fatal; - pub type RaftError = openraft::error::RaftError; - pub type RPCError = openraft::error::RPCError; - pub type StreamingError = openraft::error::StreamingError; - - pub type RaftMetrics = openraft::RaftMetrics; - - pub type ClientWriteError = openraft::error::ClientWriteError; - pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; - pub type ForwardToLeader = openraft::error::ForwardToLeader; - pub type InitializeError = openraft::error::InitializeError; - - pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; -} +#[path = "../../utils/declare_types.rs"] +pub mod typ; pub fn encode(t: T) -> String { serde_json::to_string(&t).unwrap() diff --git a/examples/raft-kv-memstore-network-v2/src/network.rs b/examples/raft-kv-memstore-network-v2/src/network.rs index b85bee8bb..723aaa64c 100644 --- a/examples/raft-kv-memstore-network-v2/src/network.rs +++ b/examples/raft-kv-memstore-network-v2/src/network.rs @@ -3,19 +3,12 @@ use std::future::Future; use openraft::error::ReplicationClosed; use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::raft::AppendEntriesRequest; -use openraft::raft::AppendEntriesResponse; -use openraft::raft::SnapshotResponse; -use openraft::raft::VoteRequest; -use openraft::raft::VoteResponse; use openraft::BasicNode; use openraft::OptionalSend; use openraft::RaftNetworkFactory; -use openraft::Snapshot; -use openraft::Vote; use crate::router::Router; -use crate::typ; +use crate::typ::*; use crate::NodeId; use crate::TypeConfig; @@ -38,9 +31,9 @@ impl RaftNetworkFactory for Router { impl RaftNetworkV2 for Connection { async fn append_entries( &mut self, - req: AppendEntriesRequest, + req: AppendEntriesRequest, _option: RPCOption, - ) -> Result, typ::RPCError> { + ) -> Result { let resp = self.router.send(self.target, "/raft/append", req).await?; Ok(resp) } @@ -48,20 +41,16 @@ impl RaftNetworkV2 for Connection { /// A real application should replace this method with customized implementation. async fn full_snapshot( &mut self, - vote: Vote, - snapshot: Snapshot, + vote: Vote, + snapshot: Snapshot, _cancel: impl Future + OptionalSend + 'static, _option: RPCOption, - ) -> Result, typ::StreamingError> { + ) -> Result { let resp = self.router.send(self.target, "/raft/snapshot", (vote, snapshot.meta, snapshot.snapshot)).await?; Ok(resp) } - async fn vote( - &mut self, - req: VoteRequest, - _option: RPCOption, - ) -> Result, typ::RPCError> { + async fn vote(&mut self, req: VoteRequest, _option: RPCOption) -> Result { let resp = self.router.send(self.target, "/raft/vote", req).await?; Ok(resp) } diff --git a/examples/raft-kv-memstore-network-v2/src/store.rs b/examples/raft-kv-memstore-network-v2/src/store.rs index 53fe37aad..3570e2101 100644 --- a/examples/raft-kv-memstore-network-v2/src/store.rs +++ b/examples/raft-kv-memstore-network-v2/src/store.rs @@ -3,20 +3,13 @@ use std::fmt::Debug; use std::sync::Arc; use std::sync::Mutex; -use openraft::alias::SnapshotDataOf; use openraft::storage::RaftStateMachine; -use openraft::storage::Snapshot; -use openraft::Entry; use openraft::EntryPayload; -use openraft::LogId; use openraft::RaftSnapshotBuilder; -use openraft::SnapshotMeta; -use openraft::StorageError; -use openraft::StoredMembership; use serde::Deserialize; use serde::Serialize; -use crate::typ; +use crate::typ::*; use crate::TypeConfig; pub type LogStore = memstore::LogStore; @@ -42,10 +35,10 @@ pub struct Response { #[derive(Debug)] pub struct StoredSnapshot { - pub meta: SnapshotMeta, + pub meta: SnapshotMeta, /// The data of the state machine at the time of this snapshot. - pub data: Box, + pub data: Box, } /// Data contained in the Raft state machine. @@ -55,9 +48,9 @@ pub struct StoredSnapshot { /// and value as String, but you could set any type of value that has the serialization impl. #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct StateMachineData { - pub last_applied: Option>, + pub last_applied: Option, - pub last_membership: StoredMembership, + pub last_membership: StoredMembership, /// Application data. pub data: BTreeMap, @@ -78,7 +71,7 @@ pub struct StateMachineStore { impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot(&mut self) -> Result, StorageError> { + async fn build_snapshot(&mut self) -> Result { let data; let last_applied_log; let last_membership; @@ -130,16 +123,14 @@ impl RaftSnapshotBuilder for Arc { impl RaftStateMachine for Arc { type SnapshotBuilder = Self; - async fn applied_state( - &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + async fn applied_state(&mut self) -> Result<(Option, StoredMembership), StorageError> { let state_machine = self.state_machine.lock().unwrap(); Ok((state_machine.last_applied, state_machine.last_membership.clone())) } #[tracing::instrument(level = "trace", skip(self, entries))] - async fn apply(&mut self, entries: I) -> Result, StorageError> - where I: IntoIterator> { + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator { let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator let mut sm = self.state_machine.lock().unwrap(); @@ -169,16 +160,12 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { Ok(Box::default()) } #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn install_snapshot( - &mut self, - meta: &SnapshotMeta, - snapshot: Box>, - ) -> Result<(), StorageError> { + async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: Box) -> Result<(), StorageError> { tracing::info!("install snapshot"); let new_snapshot = StoredSnapshot { @@ -200,7 +187,7 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result, StorageError> { match &*self.current_snapshot.lock().unwrap() { Some(snapshot) => { let data = snapshot.data.clone(); diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs index 299a5c0a1..5509b840c 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs @@ -3,18 +3,13 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; -use openraft::error::CheckIsLeaderError; -use openraft::error::Infallible; -use openraft::error::RaftError; use openraft::BasicNode; -use openraft::RaftMetrics; use crate::app::App; use crate::decode; use crate::encode; -use crate::typ; +use crate::typ::*; use crate::NodeId; -use crate::TypeConfig; pub async fn write(app: &mut App, req: String) -> String { let res = app.raft.client_write(decode(&req)).await; @@ -31,8 +26,7 @@ pub async fn read(app: &mut App, req: String) -> String { let state_machine = app.state_machine.state_machine.lock().unwrap(); let value = state_machine.data.get(&key).cloned(); - let res: Result>> = - Ok(value.unwrap_or_default()); + let res: Result> = Ok(value.unwrap_or_default()); res } Err(e) => Err(e), @@ -54,16 +48,12 @@ pub async fn append(app: &mut App, req: String) -> String { /// Receive a snapshot and install it. pub async fn snapshot(app: &mut App, req: String) -> String { - let (vote, snapshot_meta, snapshot_data): (typ::Vote, typ::SnapshotMeta, typ::SnapshotData) = decode(&req); - let snapshot = typ::Snapshot { + let (vote, snapshot_meta, snapshot_data): (Vote, SnapshotMeta, SnapshotData) = decode(&req); + let snapshot = Snapshot { meta: snapshot_meta, snapshot: Box::new(snapshot_data), }; - let res = app - .raft - .install_full_snapshot(vote, snapshot) - .await - .map_err(typ::RaftError::::Fatal); + let res = app.raft.install_full_snapshot(vote, snapshot).await.map_err(RaftError::::Fatal); encode(res) } @@ -100,6 +90,6 @@ pub async fn init(app: &mut App) -> String { pub async fn metrics(app: &mut App) -> String { let metrics = app.raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); + let res: Result = Ok(metrics); encode(res) } diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs index 6915e9a6e..7f795b5da 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs @@ -34,32 +34,8 @@ openraft::declare_raft_types!( pub type LogStore = store::LogStore; pub type StateMachineStore = store::StateMachineStore; -pub mod typ { - - use crate::TypeConfig; - - pub type Raft = openraft::Raft; - - pub type Vote = openraft::Vote; - pub type SnapshotMeta = openraft::SnapshotMeta; - pub type SnapshotData = ::SnapshotData; - pub type Snapshot = openraft::Snapshot; - - pub type Infallible = openraft::error::Infallible; - pub type Fatal = openraft::error::Fatal; - pub type RaftError = openraft::error::RaftError; - pub type RPCError = openraft::error::RPCError; - pub type StreamingError = openraft::error::StreamingError; - - pub type RaftMetrics = openraft::RaftMetrics; - - pub type ClientWriteError = openraft::error::ClientWriteError; - pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; - pub type ForwardToLeader = openraft::error::ForwardToLeader; - pub type InitializeError = openraft::error::InitializeError; - - pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; -} +#[path = "../../utils/declare_types.rs"] +pub mod typ; pub fn encode(t: T) -> String { serde_json::to_string(&t).unwrap() @@ -87,7 +63,7 @@ pub async fn new_raft(node_id: NodeId, router: Router, op: Operator) -> (typ::Ra let config = Arc::new(config.validate().unwrap()); - // Create a instance of where the Raft logs will be stored. + // Create an instance of where the Raft logs will be stored. let log_store = LogStore::default(); // Create a instance of where the state machine data will be stored. diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs index b85bee8bb..723aaa64c 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs @@ -3,19 +3,12 @@ use std::future::Future; use openraft::error::ReplicationClosed; use openraft::network::v2::RaftNetworkV2; use openraft::network::RPCOption; -use openraft::raft::AppendEntriesRequest; -use openraft::raft::AppendEntriesResponse; -use openraft::raft::SnapshotResponse; -use openraft::raft::VoteRequest; -use openraft::raft::VoteResponse; use openraft::BasicNode; use openraft::OptionalSend; use openraft::RaftNetworkFactory; -use openraft::Snapshot; -use openraft::Vote; use crate::router::Router; -use crate::typ; +use crate::typ::*; use crate::NodeId; use crate::TypeConfig; @@ -38,9 +31,9 @@ impl RaftNetworkFactory for Router { impl RaftNetworkV2 for Connection { async fn append_entries( &mut self, - req: AppendEntriesRequest, + req: AppendEntriesRequest, _option: RPCOption, - ) -> Result, typ::RPCError> { + ) -> Result { let resp = self.router.send(self.target, "/raft/append", req).await?; Ok(resp) } @@ -48,20 +41,16 @@ impl RaftNetworkV2 for Connection { /// A real application should replace this method with customized implementation. async fn full_snapshot( &mut self, - vote: Vote, - snapshot: Snapshot, + vote: Vote, + snapshot: Snapshot, _cancel: impl Future + OptionalSend + 'static, _option: RPCOption, - ) -> Result, typ::StreamingError> { + ) -> Result { let resp = self.router.send(self.target, "/raft/snapshot", (vote, snapshot.meta, snapshot.snapshot)).await?; Ok(resp) } - async fn vote( - &mut self, - req: VoteRequest, - _option: RPCOption, - ) -> Result, typ::RPCError> { + async fn vote(&mut self, req: VoteRequest, _option: RPCOption) -> Result { let resp = self.router.send(self.target, "/raft/vote", req).await?; Ok(resp) } diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs index c9f95c391..9283364f1 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs @@ -4,22 +4,14 @@ use std::sync::Arc; use std::sync::Mutex; use opendal::Operator; -use openraft::alias::SnapshotDataOf; use openraft::storage::RaftStateMachine; -use openraft::storage::Snapshot; -use openraft::Entry; -use openraft::EntryPayload; -use openraft::LogId; use openraft::RaftSnapshotBuilder; -use openraft::SnapshotMeta; -use openraft::StorageError; -use openraft::StoredMembership; use serde::Deserialize; use serde::Serialize; use crate::decode_buffer; use crate::encode; -use crate::typ; +use crate::typ::*; use crate::TypeConfig; pub type LogStore = memstore::LogStore; @@ -45,10 +37,10 @@ pub struct Response { #[derive(Debug)] pub struct StoredSnapshot { - pub meta: SnapshotMeta, + pub meta: SnapshotMeta, /// The data of the state machine at the time of this snapshot. - pub data: Box, + pub data: Box, } /// Data contained in the Raft state machine. @@ -58,9 +50,9 @@ pub struct StoredSnapshot { /// and value as String, but you could set any type of value that has the serialization impl. #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct StateMachineData { - pub last_applied: Option>, + pub last_applied: Option, - pub last_membership: StoredMembership, + pub last_membership: StoredMembership, /// Application data. pub data: BTreeMap, @@ -93,7 +85,7 @@ impl StateMachineStore { impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot(&mut self) -> Result, StorageError> { + async fn build_snapshot(&mut self) -> Result { let data; let last_applied_log; let last_membership; @@ -151,16 +143,14 @@ impl RaftSnapshotBuilder for Arc { impl RaftStateMachine for Arc { type SnapshotBuilder = Self; - async fn applied_state( - &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + async fn applied_state(&mut self) -> Result<(Option, StoredMembership), StorageError> { let state_machine = self.state_machine.lock().unwrap(); Ok((state_machine.last_applied, state_machine.last_membership.clone())) } #[tracing::instrument(level = "trace", skip(self, entries))] - async fn apply(&mut self, entries: I) -> Result, StorageError> - where I: IntoIterator> { + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator { let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator let mut sm = self.state_machine.lock().unwrap(); @@ -190,16 +180,12 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { Ok(Box::default()) } #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn install_snapshot( - &mut self, - meta: &SnapshotMeta, - snapshot: Box>, - ) -> Result<(), StorageError> { + async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: Box) -> Result<(), StorageError> { tracing::info!("install snapshot"); let new_snapshot = StoredSnapshot { @@ -222,7 +208,7 @@ impl RaftStateMachine for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result, StorageError> { match &*self.current_snapshot.lock().unwrap() { Some(snapshot) => { let data = snapshot.data.clone(); diff --git a/examples/raft-kv-memstore-singlethreaded/src/api.rs b/examples/raft-kv-memstore-singlethreaded/src/api.rs index d54a1e755..29e6416aa 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/api.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/api.rs @@ -3,17 +3,14 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; -use openraft::error::CheckIsLeaderError; use openraft::error::Infallible; -use openraft::error::RaftError; use openraft::BasicNode; -use openraft::RaftMetrics; use crate::app::App; use crate::decode; use crate::encode; +use crate::typ::*; use crate::NodeId; -use crate::TypeConfig; pub async fn write(app: &mut App, req: String) -> String { let res = app.raft.client_write(decode(&req)).await; @@ -30,8 +27,7 @@ pub async fn read(app: &mut App, req: String) -> String { let state_machine = app.state_machine.state_machine.borrow(); let value = state_machine.data.get(&key).cloned(); - let res: Result>> = - Ok(value.unwrap_or_default()); + let res: Result> = Ok(value.unwrap_or_default()); res } Err(e) => Err(e), @@ -52,7 +48,8 @@ pub async fn append(app: &mut App, req: String) -> String { } pub async fn snapshot(app: &mut App, req: String) -> String { - let res = app.raft.install_snapshot(decode(&req)).await; + let req = decode(&req); + let res = app.raft.install_snapshot(req).await; encode(res) } @@ -89,6 +86,6 @@ pub async fn init(app: &mut App) -> String { pub async fn metrics(app: &mut App) -> String { let metrics = app.raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); + let res: Result = Ok(metrics); encode(res) } diff --git a/examples/raft-kv-memstore-singlethreaded/src/app.rs b/examples/raft-kv-memstore-singlethreaded/src/app.rs index 24363d0e0..640e680fa 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/app.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/app.rs @@ -5,8 +5,8 @@ use tokio::sync::oneshot; use crate::api; use crate::router::Router; +use crate::typ::Raft; use crate::NodeId; -use crate::Raft; use crate::StateMachineStore; pub type Path = String; diff --git a/examples/raft-kv-memstore-singlethreaded/src/lib.rs b/examples/raft-kv-memstore-singlethreaded/src/lib.rs index ce2b7d349..01a5f84e9 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/lib.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/lib.rs @@ -52,24 +52,9 @@ openraft::declare_raft_types!( pub type LogStore = store::LogStore; pub type StateMachineStore = store::StateMachineStore; -pub type Raft = openraft::Raft; -pub mod typ { - - use crate::TypeConfig; - - pub type RaftError = openraft::error::RaftError; - pub type RPCError = openraft::error::RPCError>; - - pub type RaftMetrics = openraft::RaftMetrics; - - pub type ClientWriteError = openraft::error::ClientWriteError; - pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; - pub type ForwardToLeader = openraft::error::ForwardToLeader; - pub type InitializeError = openraft::error::InitializeError; - - pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; -} +#[path = "../../utils/declare_types.rs"] +pub mod typ; pub fn encode(t: T) -> String { serde_json::to_string(&t).unwrap() diff --git a/examples/raft-kv-memstore-singlethreaded/src/network.rs b/examples/raft-kv-memstore-singlethreaded/src/network.rs index efbb51a9d..d6069c6d8 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/network.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/network.rs @@ -1,18 +1,11 @@ use openraft::error::InstallSnapshotError; -use openraft::error::RemoteError; use openraft::network::RPCOption; -use openraft::raft::AppendEntriesRequest; -use openraft::raft::AppendEntriesResponse; -use openraft::raft::InstallSnapshotRequest; -use openraft::raft::InstallSnapshotResponse; -use openraft::raft::VoteRequest; -use openraft::raft::VoteResponse; use openraft::BasicNode; use openraft::RaftNetwork; use openraft::RaftNetworkFactory; use crate::router::Router; -use crate::typ; +use crate::typ::*; use crate::NodeId; use crate::TypeConfig; @@ -35,40 +28,24 @@ impl RaftNetworkFactory for Router { impl RaftNetwork for Connection { async fn append_entries( &mut self, - req: AppendEntriesRequest, + req: AppendEntriesRequest, _option: RPCOption, - ) -> Result, typ::RPCError> { - let resp = self - .router - .send(self.target, "/raft/append", req) - .await - .map_err(|e| RemoteError::new(self.target, e))?; + ) -> Result> { + let resp = self.router.send(self.target, "/raft/append", req).await?; Ok(resp) } async fn install_snapshot( &mut self, - req: InstallSnapshotRequest, + req: InstallSnapshotRequest, _option: RPCOption, - ) -> Result, typ::RPCError> { - let resp = self - .router - .send(self.target, "/raft/snapshot", req) - .await - .map_err(|e| RemoteError::new(self.target, e))?; + ) -> Result>> { + let resp = self.router.send(self.target, "/raft/snapshot", req).await?; Ok(resp) } - async fn vote( - &mut self, - req: VoteRequest, - _option: RPCOption, - ) -> Result, typ::RPCError> { - let resp = self - .router - .send(self.target, "/raft/vote", req) - .await - .map_err(|e| RemoteError::new(self.target, e))?; + async fn vote(&mut self, req: VoteRequest, _option: RPCOption) -> Result> { + let resp = self.router.send(self.target, "/raft/vote", req).await?; Ok(resp) } } diff --git a/examples/raft-kv-memstore-singlethreaded/src/router.rs b/examples/raft-kv-memstore-singlethreaded/src/router.rs index 9a7c34cba..9d7f5a29c 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/router.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/router.rs @@ -2,11 +2,14 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; +use openraft::error::RemoteError; +use openraft::error::Unreachable; use tokio::sync::oneshot; use crate::app::RequestTx; use crate::decode; use crate::encode; +use crate::typ::RPCError; use crate::typ::RaftError; use crate::NodeId; @@ -18,11 +21,12 @@ pub struct Router { } impl Router { - /// Send request `Req` to target node `to`, and wait for response `Result>`. - pub async fn send(&self, to: NodeId, path: &str, req: Req) -> Result> + /// Send request `Req` to target node `to`, and wait for response `Result`. + pub async fn send(&self, to: NodeId, path: &str, req: Req) -> Result> where Req: serde::Serialize, Result>: serde::de::DeserializeOwned, + E: std::error::Error, { let (resp_tx, resp_rx) = oneshot::channel(); @@ -39,6 +43,13 @@ impl Router { let resp_str = resp_rx.await.unwrap(); tracing::debug!("resp from: {}, {}, {}", to, path, resp_str); - decode::>>(&resp_str) + let res = decode::>>(&resp_str); + match res { + Ok(r) => Ok(r), + Err(e) => match e { + RaftError::APIError(x) => Err(RPCError::RemoteError(RemoteError::new(to, x))), + RaftError::Fatal(f) => Err(RPCError::Unreachable(Unreachable::new(&f))), + }, + } } } diff --git a/examples/raft-kv-memstore-singlethreaded/src/store.rs b/examples/raft-kv-memstore-singlethreaded/src/store.rs index 54f672640..902db0091 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/store.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/store.rs @@ -6,24 +6,14 @@ use std::marker::PhantomData; use std::ops::RangeBounds; use std::rc::Rc; -use openraft::alias::SnapshotDataOf; -use openraft::storage::IOFlushed; -use openraft::storage::LogState; use openraft::storage::RaftLogStorage; use openraft::storage::RaftStateMachine; -use openraft::storage::Snapshot; -use openraft::Entry; -use openraft::EntryPayload; -use openraft::LogId; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; -use openraft::SnapshotMeta; -use openraft::StorageError; -use openraft::StoredMembership; -use openraft::Vote; use serde::Deserialize; use serde::Serialize; +use crate::typ::*; use crate::TypeConfig; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -71,7 +61,7 @@ pub struct Response { #[derive(Debug)] pub struct StoredSnapshot { - pub meta: SnapshotMeta, + pub meta: SnapshotMeta, /// The data of the state machine at the time of this snapshot. pub data: Vec, @@ -84,9 +74,9 @@ pub struct StoredSnapshot { /// and value as String, but you could set any type of value that has the serialization impl. #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct StateMachineData { - pub last_applied: Option>, + pub last_applied: Option, - pub last_membership: StoredMembership, + pub last_membership: StoredMembership, /// Application data. pub data: BTreeMap, @@ -107,35 +97,35 @@ pub struct StateMachineStore { #[derive(Debug, Default)] pub struct LogStore { - last_purged_log_id: RefCell>>, + last_purged_log_id: RefCell>, /// The Raft log. - log: RefCell>>, + log: RefCell>, - committed: RefCell>>, + committed: RefCell>, /// The current granted vote. - vote: RefCell>>, + vote: RefCell>, } impl RaftLogReader for Rc { async fn try_get_log_entries + Clone + Debug>( &mut self, range: RB, - ) -> Result>, StorageError> { + ) -> Result, StorageError> { let log = self.log.borrow(); let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); Ok(response) } - async fn read_vote(&mut self) -> Result>, StorageError> { + async fn read_vote(&mut self) -> Result, StorageError> { Ok(*self.vote.borrow()) } } impl RaftSnapshotBuilder for Rc { #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot(&mut self) -> Result, StorageError> { + async fn build_snapshot(&mut self) -> Result { let data; let last_applied_log; let last_membership; @@ -187,16 +177,14 @@ impl RaftSnapshotBuilder for Rc { impl RaftStateMachine for Rc { type SnapshotBuilder = Self; - async fn applied_state( - &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + async fn applied_state(&mut self) -> Result<(Option, StoredMembership), StorageError> { let state_machine = self.state_machine.borrow(); Ok((state_machine.last_applied, state_machine.last_membership.clone())) } #[tracing::instrument(level = "trace", skip(self, entries))] - async fn apply(&mut self, entries: I) -> Result, StorageError> - where I: IntoIterator> { + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator { let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator let mut sm = self.state_machine.borrow_mut(); @@ -226,16 +214,12 @@ impl RaftStateMachine for Rc { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn install_snapshot( - &mut self, - meta: &SnapshotMeta, - snapshot: Box>, - ) -> Result<(), StorageError> { + async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: Box) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, "decoding snapshot for installation" @@ -261,7 +245,7 @@ impl RaftStateMachine for Rc { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result, StorageError> { match &*self.current_snapshot.borrow() { Some(snapshot) => { let data = snapshot.data.clone(); @@ -282,7 +266,7 @@ impl RaftStateMachine for Rc { impl RaftLogStorage for Rc { type LogReader = Self; - async fn get_log_state(&mut self) -> Result, StorageError> { + async fn get_log_state(&mut self) -> Result { let log = self.log.borrow(); let last = log.iter().next_back().map(|(_, ent)| ent.log_id); @@ -299,27 +283,27 @@ impl RaftLogStorage for Rc { }) } - async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + async fn save_committed(&mut self, committed: Option) -> Result<(), StorageError> { let mut c = self.committed.borrow_mut(); *c = committed; Ok(()) } - async fn read_committed(&mut self) -> Result>, StorageError> { + async fn read_committed(&mut self) -> Result, StorageError> { let committed = self.committed.borrow(); Ok(*committed) } #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { let mut v = self.vote.borrow_mut(); *v = Some(*vote); Ok(()) } #[tracing::instrument(level = "trace", skip(self, entries, callback))] - async fn append(&mut self, entries: I, callback: IOFlushed) -> Result<(), StorageError> - where I: IntoIterator> { + async fn append(&mut self, entries: I, callback: IOFlushed) -> Result<(), StorageError> + where I: IntoIterator { // Simple implementation that calls the flush-before-return `append_to_log`. let mut log = self.log.borrow_mut(); for entry in entries { @@ -331,7 +315,7 @@ impl RaftLogStorage for Rc { } #[tracing::instrument(level = "debug", skip(self))] - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("delete_log: [{:?}, +oo)", log_id); let mut log = self.log.borrow_mut(); @@ -344,7 +328,7 @@ impl RaftLogStorage for Rc { } #[tracing::instrument(level = "debug", skip(self))] - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("delete_log: (-oo, {:?}]", log_id); { diff --git a/examples/raft-kv-memstore/src/client.rs b/examples/raft-kv-memstore/src/client.rs index d0c852ee7..1136857d5 100644 --- a/examples/raft-kv-memstore/src/client.rs +++ b/examples/raft-kv-memstore/src/client.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; -use openraft::error::ForwardToLeader; use openraft::error::NetworkError; use openraft::error::RemoteError; use openraft::RaftMetrics; @@ -13,7 +12,7 @@ use serde::Deserialize; use serde::Serialize; use tokio::time::timeout; -use crate::typ; +use crate::typ::*; use crate::NodeId; use crate::Request; use crate::TypeConfig; @@ -47,21 +46,21 @@ impl ExampleClient { /// will be applied to state machine. /// /// The result of applying the request will be returned. - pub async fn write(&self, req: &Request) -> Result> { + pub async fn write(&self, req: &Request) -> Result>> { self.send_rpc_to_leader("write", Some(req)).await } /// Read value by key, in an inconsistent mode. /// /// This method may return stale value because it does not force to read on a legal leader. - pub async fn read(&self, req: &String) -> Result { + pub async fn read(&self, req: &String) -> Result> { self.do_send_rpc_to_leader("read", Some(req)).await } /// Consistent Read value by key, in an inconsistent mode. /// /// This method MUST return consistent value or CheckIsLeaderError. - pub async fn consistent_read(&self, req: &String) -> Result> { + pub async fn consistent_read(&self, req: &String) -> Result>> { self.do_send_rpc_to_leader("consistent_read", Some(req)).await } @@ -73,7 +72,7 @@ impl ExampleClient { /// With a initialized cluster, new node can be added with [`write`]. /// Then setup replication with [`add_learner`]. /// Then make the new node a member with [`change_membership`]. - pub async fn init(&self) -> Result<(), typ::RPCError> { + pub async fn init(&self) -> Result<(), RPCError>> { self.do_send_rpc_to_leader("init", Some(&Vec::<(NodeId, String)>::new())).await } @@ -83,7 +82,7 @@ impl ExampleClient { pub async fn add_learner( &self, req: (NodeId, String), - ) -> Result> { + ) -> Result>> { self.send_rpc_to_leader("add-learner", Some(&req)).await } @@ -94,7 +93,7 @@ impl ExampleClient { pub async fn change_membership( &self, req: &BTreeSet, - ) -> Result> { + ) -> Result>> { self.send_rpc_to_leader("change-membership", Some(req)).await } @@ -103,7 +102,7 @@ impl ExampleClient { /// Metrics contains various information about the cluster, such as current leader, /// membership config, replication status etc. /// See [`RaftMetrics`]. - pub async fn metrics(&self) -> Result, typ::RPCError> { + pub async fn metrics(&self) -> Result, RPCError> { self.do_send_rpc_to_leader("metrics", None::<&()>).await } @@ -119,7 +118,7 @@ impl ExampleClient { &self, uri: &str, req: Option<&Req>, - ) -> Result> + ) -> Result>> where Req: Serialize + 'static, Resp: Serialize + DeserializeOwned, @@ -146,39 +145,43 @@ impl ExampleClient { let res = timeout(Duration::from_millis(3_000), fu).await; let resp = match res { - Ok(x) => x.map_err(|e| typ::RPCError::Network(NetworkError::new(&e)))?, + Ok(x) => x.map_err(|e| RPCError::Network(NetworkError::new(&e)))?, Err(timeout_err) => { tracing::error!("timeout {} to url: {}", timeout_err, url); - return Err(typ::RPCError::Network(NetworkError::new(&timeout_err))); + return Err(RPCError::Network(NetworkError::new(&timeout_err))); } }; - let res: Result> = - resp.json().await.map_err(|e| typ::RPCError::Network(NetworkError::new(&e)))?; + let res: Result> = + resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; tracing::debug!( "<<< client recv reply from {}: {}", url, serde_json::to_string_pretty(&res).unwrap() ); - res.map_err(|e| typ::RPCError::RemoteError(RemoteError::new(leader_id, e))) + res.map_err(|e| RPCError::RemoteError(RemoteError::new(leader_id, e))) } /// Try the best to send a request to the leader. /// /// If the target node is not a leader, a `ForwardToLeader` error will be /// returned and this client will retry at most 3 times to contact the updated leader. - async fn send_rpc_to_leader(&self, uri: &str, req: Option<&Req>) -> Result> + async fn send_rpc_to_leader( + &self, + uri: &str, + req: Option<&Req>, + ) -> Result>> where Req: Serialize + 'static, Resp: Serialize + DeserializeOwned, - Err: std::error::Error + Serialize + DeserializeOwned + TryAsRef + Clone, + Err: std::error::Error + Serialize + DeserializeOwned + TryAsRef + Clone, { // Retry at most 3 times to find a valid leader. let mut n_retry = 3; loop { - let res: Result> = self.do_send_rpc_to_leader(uri, req).await; + let res: Result>> = self.do_send_rpc_to_leader(uri, req).await; let rpc_err = match res { Ok(x) => return Ok(x), diff --git a/examples/raft-kv-memstore/src/lib.rs b/examples/raft-kv-memstore/src/lib.rs index c10d09c2c..e0aa1aeb7 100644 --- a/examples/raft-kv-memstore/src/lib.rs +++ b/examples/raft-kv-memstore/src/lib.rs @@ -37,20 +37,8 @@ pub type LogStore = store::LogStore; pub type StateMachineStore = store::StateMachineStore; pub type Raft = openraft::Raft; -pub mod typ { - - use crate::TypeConfig; - - pub type RaftError = openraft::error::RaftError; - pub type RPCError = openraft::error::RPCError>; - - pub type ClientWriteError = openraft::error::ClientWriteError; - pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; - pub type ForwardToLeader = openraft::error::ForwardToLeader; - pub type InitializeError = openraft::error::InitializeError; - - pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; -} +#[path = "../../utils/declare_types.rs"] +pub mod typ; pub async fn start_example_raft_node(node_id: NodeId, http_addr: String) -> std::io::Result<()> { // Create a configuration for the raft instance. diff --git a/examples/raft-kv-memstore/src/network/raft_network_impl.rs b/examples/raft-kv-memstore/src/network/raft_network_impl.rs index 561b38f5f..29f6ae8ee 100644 --- a/examples/raft-kv-memstore/src/network/raft_network_impl.rs +++ b/examples/raft-kv-memstore/src/network/raft_network_impl.rs @@ -15,7 +15,7 @@ use openraft::BasicNode; use serde::de::DeserializeOwned; use serde::Serialize; -use crate::typ; +use crate::typ::*; use crate::NodeId; use crate::TypeConfig; @@ -85,7 +85,7 @@ impl RaftNetwork for NetworkConnection { &mut self, req: AppendEntriesRequest, _option: RPCOption, - ) -> Result, typ::RPCError> { + ) -> Result, RPCError> { self.owner.send_rpc(self.target, &self.target_node, "raft-append", req).await } @@ -93,7 +93,7 @@ impl RaftNetwork for NetworkConnection { &mut self, req: InstallSnapshotRequest, _option: RPCOption, - ) -> Result, typ::RPCError> { + ) -> Result, RPCError>> { self.owner.send_rpc(self.target, &self.target_node, "raft-snapshot", req).await } @@ -101,7 +101,7 @@ impl RaftNetwork for NetworkConnection { &mut self, req: VoteRequest, _option: RPCOption, - ) -> Result, typ::RPCError> { + ) -> Result, RPCError> { self.owner.send_rpc(self.target, &self.target_node, "raft-vote", req).await } } diff --git a/examples/raft-kv-rocksdb/src/app.rs b/examples/raft-kv-rocksdb/src/app.rs index 227ac0f7f..408fac812 100644 --- a/examples/raft-kv-rocksdb/src/app.rs +++ b/examples/raft-kv-rocksdb/src/app.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use openraft::Config; use tokio::sync::RwLock; -use crate::ExampleRaft; +use crate::typ::Raft; use crate::NodeId; // Representation of an application state. This struct can be shared around to share @@ -13,7 +13,7 @@ pub struct App { pub id: NodeId, pub api_addr: String, pub rpc_addr: String, - pub raft: ExampleRaft, + pub raft: Raft, pub key_values: Arc>>, pub config: Arc, } diff --git a/examples/raft-kv-rocksdb/src/client.rs b/examples/raft-kv-rocksdb/src/client.rs index fd81d6fc8..0f3732a07 100644 --- a/examples/raft-kv-rocksdb/src/client.rs +++ b/examples/raft-kv-rocksdb/src/client.rs @@ -3,20 +3,17 @@ use std::sync::Arc; use std::sync::Mutex; use openraft::error::NetworkError; -use openraft::error::RPCError; use openraft::error::RemoteError; use openraft::error::Unreachable; -use openraft::RaftMetrics; use openraft::TryAsRef; use reqwest::Client; use serde::de::DeserializeOwned; use serde::Deserialize; use serde::Serialize; -use crate::typ; +use crate::typ::*; use crate::NodeId; use crate::Request; -use crate::TypeConfig; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Empty {} @@ -47,21 +44,21 @@ impl ExampleClient { /// will be applied to state machine. /// /// The result of applying the request will be returned. - pub async fn write(&self, req: &Request) -> Result> { + pub async fn write(&self, req: &Request) -> Result>> { self.send_rpc_to_leader("api/write", Some(req)).await } /// Read value by key, in an inconsistent mode. /// /// This method may return stale value because it does not force to read on a legal leader. - pub async fn read(&self, req: &String) -> Result { + pub async fn read(&self, req: &String) -> Result> { self.do_send_rpc_to_leader("api/read", Some(req)).await } /// Consistent Read value by key, in an inconsistent mode. /// /// This method MUST return consistent value or CheckIsLeaderError. - pub async fn consistent_read(&self, req: &String) -> Result> { + pub async fn consistent_read(&self, req: &String) -> Result>> { self.do_send_rpc_to_leader("api/consistent_read", Some(req)).await } @@ -73,7 +70,7 @@ impl ExampleClient { /// With a initialized cluster, new node can be added with [`write`]. /// Then setup replication with [`add_learner`]. /// Then make the new node a member with [`change_membership`]. - pub async fn init(&self) -> Result<(), typ::RPCError> { + pub async fn init(&self) -> Result<(), RPCError>> { self.do_send_rpc_to_leader("cluster/init", Some(&Empty {})).await } @@ -83,7 +80,7 @@ impl ExampleClient { pub async fn add_learner( &self, req: (NodeId, String, String), - ) -> Result> { + ) -> Result>> { self.send_rpc_to_leader("cluster/add-learner", Some(&req)).await } @@ -94,7 +91,7 @@ impl ExampleClient { pub async fn change_membership( &self, req: &BTreeSet, - ) -> Result> { + ) -> Result>> { self.send_rpc_to_leader("cluster/change-membership", Some(req)).await } @@ -103,7 +100,7 @@ impl ExampleClient { /// Metrics contains various information about the cluster, such as current leader, /// membership config, replication status etc. /// See [`RaftMetrics`]. - pub async fn metrics(&self) -> Result, typ::RPCError> { + pub async fn metrics(&self) -> Result> { self.do_send_rpc_to_leader("cluster/metrics", None::<&()>).await } @@ -118,7 +115,7 @@ impl ExampleClient { &self, uri: &str, req: Option<&Req>, - ) -> Result> + ) -> Result>> where Req: Serialize + 'static, Resp: Serialize + DeserializeOwned, @@ -151,7 +148,8 @@ impl ExampleClient { RPCError::Network(NetworkError::new(&e)) })?; - let res: Result = resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let res: Result> = + resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; println!( "<<< client recv reply from {}: {}", url, @@ -165,17 +163,21 @@ impl ExampleClient { /// /// If the target node is not a leader, a `ForwardToLeader` error will be /// returned and this client will retry at most 3 times to contact the updated leader. - async fn send_rpc_to_leader(&self, uri: &str, req: Option<&Req>) -> Result> + async fn send_rpc_to_leader( + &self, + uri: &str, + req: Option<&Req>, + ) -> Result>> where Req: Serialize + 'static, Resp: Serialize + DeserializeOwned, - Err: std::error::Error + Serialize + DeserializeOwned + TryAsRef + Clone, + Err: std::error::Error + Serialize + DeserializeOwned + TryAsRef + Clone, { // Retry at most 3 times to find a valid leader. let mut n_retry = 3; loop { - let res: Result> = self.do_send_rpc_to_leader(uri, req).await; + let res: Result>> = self.do_send_rpc_to_leader(uri, req).await; let rpc_err = match res { Ok(x) => return Ok(x), @@ -183,9 +185,9 @@ impl ExampleClient { }; if let RPCError::RemoteError(remote_err) = &rpc_err { - let raft_err: &typ::RaftError<_> = &remote_err.source; + let raft_err: &RaftError<_> = &remote_err.source; - if let Some(typ::ForwardToLeader { + if let Some(ForwardToLeader { leader_id: Some(leader_id), leader_node: Some(leader_node), .. diff --git a/examples/raft-kv-rocksdb/src/lib.rs b/examples/raft-kv-rocksdb/src/lib.rs index bb127ab6a..6db755968 100644 --- a/examples/raft-kv-rocksdb/src/lib.rs +++ b/examples/raft-kv-rocksdb/src/lib.rs @@ -2,7 +2,6 @@ #![deny(unused_qualifications)] use std::fmt::Display; -use std::io::Cursor; use std::path::Path; use std::sync::Arc; @@ -37,8 +36,6 @@ impl Display for Node { } } -pub type SnapshotData = Cursor>; - openraft::declare_raft_types!( pub TypeConfig: D = Request, @@ -46,25 +43,8 @@ openraft::declare_raft_types!( Node = Node, ); -pub mod typ { - use openraft::error::Infallible; - - use crate::TypeConfig; - - pub type Entry = openraft::Entry; - - pub type RaftError = openraft::error::RaftError; - pub type RPCError = openraft::error::RPCError>; - - pub type ClientWriteError = openraft::error::ClientWriteError; - pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; - pub type ForwardToLeader = openraft::error::ForwardToLeader; - pub type InitializeError = openraft::error::InitializeError; - - pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; -} - -pub type ExampleRaft = openraft::Raft; +#[path = "../../utils/declare_types.rs"] +pub mod typ; type Server = tide::Server>; diff --git a/examples/raft-kv-rocksdb/src/network/api.rs b/examples/raft-kv-rocksdb/src/network/api.rs index b8208e7e1..21202a1af 100644 --- a/examples/raft-kv-rocksdb/src/network/api.rs +++ b/examples/raft-kv-rocksdb/src/network/api.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use openraft::error::CheckIsLeaderError; use openraft::error::Infallible; use tide::Body; use tide::Request; @@ -8,8 +7,8 @@ use tide::Response; use tide::StatusCode; use crate::app::App; +use crate::typ::*; use crate::Server; -use crate::TypeConfig; pub fn rest(app: &mut Server) { let mut api = app.at("/api"); @@ -51,7 +50,7 @@ async fn consistent_read(mut req: Request>) -> tide::Result { let value = kvs.get(&key); - let res: Result> = Ok(value.cloned().unwrap_or_default()); + let res: Result = Ok(value.cloned().unwrap_or_default()); Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build()) } e => Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&e)?).build()), diff --git a/examples/raft-kv-rocksdb/src/network/management.rs b/examples/raft-kv-rocksdb/src/network/management.rs index ab1fc10b0..943ee6dbe 100644 --- a/examples/raft-kv-rocksdb/src/network/management.rs +++ b/examples/raft-kv-rocksdb/src/network/management.rs @@ -3,17 +3,16 @@ use std::collections::BTreeSet; use std::sync::Arc; use openraft::error::Infallible; -use openraft::RaftMetrics; use tide::Body; use tide::Request; use tide::Response; use tide::StatusCode; use crate::app::App; +use crate::typ::*; use crate::Node; use crate::NodeId; use crate::Server; -use crate::TypeConfig; // --- Cluster management @@ -61,6 +60,6 @@ async fn init(req: Request>) -> tide::Result { async fn metrics(req: Request>) -> tide::Result { let metrics = req.state().raft.metrics().borrow().clone(); - let res: Result, Infallible> = Ok(metrics); + let res: Result = Ok(metrics); Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build()) } diff --git a/examples/raft-kv-rocksdb/src/network/raft.rs b/examples/raft-kv-rocksdb/src/network/raft.rs index 71ec68eef..d6f34dd8f 100644 --- a/examples/raft-kv-rocksdb/src/network/raft.rs +++ b/examples/raft-kv-rocksdb/src/network/raft.rs @@ -1,15 +1,9 @@ use std::sync::Arc; -use openraft::raft::AppendEntriesRequest; -use openraft::raft::AppendEntriesResponse; -use openraft::raft::InstallSnapshotRequest; -use openraft::raft::InstallSnapshotResponse; -use openraft::raft::VoteRequest; -use openraft::raft::VoteResponse; use toy_rpc::macros::export_impl; use crate::app::App; -use crate::TypeConfig; +use crate::typ::*; /// Raft protocol service. pub struct Raft { @@ -23,24 +17,18 @@ impl Raft { } #[export_method] - pub async fn vote(&self, vote: VoteRequest) -> Result, toy_rpc::Error> { + pub async fn vote(&self, vote: VoteRequest) -> Result { self.app.raft.vote(vote).await.map_err(|e| toy_rpc::Error::Internal(Box::new(e))) } #[export_method] - pub async fn append( - &self, - req: AppendEntriesRequest, - ) -> Result, toy_rpc::Error> { + pub async fn append(&self, req: AppendEntriesRequest) -> Result { tracing::debug!("handle append"); self.app.raft.append_entries(req).await.map_err(|e| toy_rpc::Error::Internal(Box::new(e))) } #[export_method] - pub async fn snapshot( - &self, - req: InstallSnapshotRequest, - ) -> Result, toy_rpc::Error> { + pub async fn snapshot(&self, req: InstallSnapshotRequest) -> Result { self.app.raft.install_snapshot(req).await.map_err(|e| toy_rpc::Error::Internal(Box::new(e))) } } diff --git a/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs b/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs index bb9dafda6..6aec351a1 100644 --- a/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs +++ b/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs @@ -3,24 +3,17 @@ use std::fmt::Display; use openraft::error::InstallSnapshotError; use openraft::error::NetworkError; -use openraft::error::RPCError; -use openraft::error::RaftError; use openraft::error::RemoteError; use openraft::network::RPCOption; use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; -use openraft::raft::AppendEntriesRequest; -use openraft::raft::AppendEntriesResponse; -use openraft::raft::InstallSnapshotRequest; -use openraft::raft::InstallSnapshotResponse; -use openraft::raft::VoteRequest; -use openraft::raft::VoteResponse; use openraft::AnyError; use serde::de::DeserializeOwned; use toy_rpc::pubsub::AckModeNone; use toy_rpc::Client; use super::raft::RaftClientStub; +use crate::typ::*; use crate::Node; use crate::NodeId; use crate::TypeConfig; @@ -49,9 +42,7 @@ pub struct NetworkConnection { target: NodeId, } impl NetworkConnection { - async fn c( - &mut self, - ) -> Result<&Client, RPCError> { + async fn c(&mut self) -> Result<&Client, RPCError> { if self.client.is_none() { self.client = Client::dial_websocket(&self.addr).await.ok(); } @@ -70,7 +61,7 @@ impl Display for ErrWrap { impl std::error::Error for ErrWrap {} -fn to_error(e: toy_rpc::Error, target: NodeId) -> RPCError { +fn to_error(e: toy_rpc::Error, target: NodeId) -> RPCError { match e { toy_rpc::Error::IoError(e) => RPCError::Network(NetworkError::new(&e)), toy_rpc::Error::ParseError(e) => RPCError::Network(NetworkError::new(&ErrWrap(e))), @@ -115,9 +106,9 @@ impl RaftNetwork for NetworkConnection { #[tracing::instrument(level = "debug", skip_all, err(Debug))] async fn append_entries( &mut self, - req: AppendEntriesRequest, + req: AppendEntriesRequest, _option: RPCOption, - ) -> Result, RPCError>> { + ) -> Result> { tracing::debug!(req = debug(&req), "append_entries"); let c = self.c().await?; @@ -132,20 +123,15 @@ impl RaftNetwork for NetworkConnection { #[tracing::instrument(level = "debug", skip_all, err(Debug))] async fn install_snapshot( &mut self, - req: InstallSnapshotRequest, + req: InstallSnapshotRequest, _option: RPCOption, - ) -> Result, RPCError>> - { + ) -> Result>> { tracing::debug!(req = debug(&req), "install_snapshot"); self.c().await?.raft().snapshot(req).await.map_err(|e| to_error(e, self.target)) } #[tracing::instrument(level = "debug", skip_all, err(Debug))] - async fn vote( - &mut self, - req: VoteRequest, - _option: RPCOption, - ) -> Result, RPCError>> { + async fn vote(&mut self, req: VoteRequest, _option: RPCOption) -> Result> { tracing::debug!(req = debug(&req), "vote"); self.c().await?.raft().vote(req).await.map_err(|e| to_error(e, self.target)) } diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index e2b3ca931..a335eb7d0 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -8,24 +8,14 @@ use std::sync::Arc; use byteorder::BigEndian; use byteorder::ReadBytesExt; use byteorder::WriteBytesExt; -use openraft::storage::IOFlushed; -use openraft::storage::LogState; use openraft::storage::RaftLogStorage; use openraft::storage::RaftStateMachine; -use openraft::storage::Snapshot; use openraft::AnyError; -use openraft::Entry; use openraft::EntryPayload; -use openraft::ErrorSubject; use openraft::ErrorVerb; -use openraft::LogId; use openraft::OptionalSend; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; -use openraft::SnapshotMeta; -use openraft::StorageError; -use openraft::StoredMembership; -use openraft::Vote; use rocksdb::ColumnFamily; use rocksdb::ColumnFamilyDescriptor; use rocksdb::Direction; @@ -35,8 +25,7 @@ use serde::Deserialize; use serde::Serialize; use tokio::sync::RwLock; -use crate::typ; -use crate::SnapshotData; +use crate::typ::*; use crate::TypeConfig; /** @@ -65,7 +54,7 @@ pub struct Response { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct StoredSnapshot { - pub meta: SnapshotMeta, + pub meta: SnapshotMeta, /// The data of the state machine at the time of this snapshot. pub data: Vec, @@ -87,16 +76,16 @@ pub struct StateMachineStore { #[derive(Debug, Clone)] pub struct StateMachineData { - pub last_applied_log_id: Option>, + pub last_applied_log_id: Option, - pub last_membership: StoredMembership, + pub last_membership: StoredMembership, /// State built from applying the raft logs pub kvs: Arc>>, } impl RaftSnapshotBuilder for StateMachineStore { - async fn build_snapshot(&mut self) -> Result, StorageError> { + async fn build_snapshot(&mut self) -> Result { let last_applied_log = self.data.last_applied_log_id; let last_membership = self.data.last_membership.clone(); @@ -132,7 +121,7 @@ impl RaftSnapshotBuilder for StateMachineStore { } impl StateMachineStore { - async fn new(db: Arc) -> Result> { + async fn new(db: Arc) -> Result { let mut sm = Self { data: StateMachineData { last_applied_log_id: None, @@ -151,7 +140,7 @@ impl StateMachineStore { Ok(sm) } - async fn update_state_machine_(&mut self, snapshot: StoredSnapshot) -> Result<(), StorageError> { + async fn update_state_machine_(&mut self, snapshot: StoredSnapshot) -> Result<(), StorageError> { let kvs: BTreeMap = serde_json::from_slice(&snapshot.data) .map_err(|e| StorageError::read_snapshot(Some(snapshot.meta.signature()), &e))?; @@ -179,7 +168,7 @@ impl StateMachineStore { Ok(()) } - fn flush(&self, subject: ErrorSubject, verb: ErrorVerb) -> Result<(), StorageError> { + fn flush(&self, subject: ErrorSubject, verb: ErrorVerb) -> Result<(), StorageError> { self.db.flush_wal(true).map_err(|e| StorageError::new(subject, verb, AnyError::new(&e)))?; Ok(()) } @@ -192,15 +181,13 @@ impl StateMachineStore { impl RaftStateMachine for StateMachineStore { type SnapshotBuilder = Self; - async fn applied_state( - &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + async fn applied_state(&mut self) -> Result<(Option, StoredMembership), StorageError> { Ok((self.data.last_applied_log_id, self.data.last_membership.clone())) } - async fn apply(&mut self, entries: I) -> Result, StorageError> + async fn apply(&mut self, entries: I) -> Result, StorageError> where - I: IntoIterator + OptionalSend, + I: IntoIterator + OptionalSend, I::IntoIter: OptionalSend, { let entries = entries.into_iter(); @@ -236,15 +223,11 @@ impl RaftStateMachine for StateMachineStore { self.clone() } - async fn begin_receiving_snapshot(&mut self) -> Result>>, StorageError> { + async fn begin_receiving_snapshot(&mut self) -> Result>>, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } - async fn install_snapshot( - &mut self, - meta: &SnapshotMeta, - snapshot: Box, - ) -> Result<(), StorageError> { + async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: Box) -> Result<(), StorageError> { let new_snapshot = StoredSnapshot { meta: meta.clone(), data: snapshot.into_inner(), @@ -257,7 +240,7 @@ impl RaftStateMachine for StateMachineStore { Ok(()) } - async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + async fn get_current_snapshot(&mut self) -> Result, StorageError> { let x = self.get_current_snapshot_()?; Ok(x.map(|s| Snapshot { meta: s.meta.clone(), @@ -270,7 +253,7 @@ impl RaftStateMachine for StateMachineStore { pub struct LogStore { db: Arc, } -type StorageResult = Result>; +type StorageResult = Result; /// converts an id to a byte vector for storing in the database. /// Note that we're using big endian encoding to ensure correct sorting of keys @@ -293,12 +276,12 @@ impl LogStore { self.db.cf_handle("logs").unwrap() } - fn flush(&self, subject: ErrorSubject, verb: ErrorVerb) -> Result<(), StorageError> { + fn flush(&self, subject: ErrorSubject, verb: ErrorVerb) -> Result<(), StorageError> { self.db.flush_wal(true).map_err(|e| StorageError::new(subject, verb, AnyError::new(&e)))?; Ok(()) } - fn get_last_purged_(&self) -> StorageResult>> { + fn get_last_purged_(&self) -> StorageResult> { Ok(self .db .get_cf(self.store(), b"last_purged_log_id") @@ -306,7 +289,7 @@ impl LogStore { .and_then(|v| serde_json::from_slice(&v).ok())) } - fn set_last_purged_(&self, log_id: LogId) -> StorageResult<()> { + fn set_last_purged_(&self, log_id: LogId) -> StorageResult<()> { self.db .put_cf( self.store(), @@ -319,7 +302,7 @@ impl LogStore { Ok(()) } - fn set_committed_(&self, committed: &Option>) -> Result<(), StorageError> { + fn set_committed_(&self, committed: &Option) -> Result<(), StorageError> { let json = serde_json::to_vec(committed).unwrap(); self.db.put_cf(self.store(), b"committed", json).map_err(|e| StorageError::write(&e))?; @@ -328,7 +311,7 @@ impl LogStore { Ok(()) } - fn get_committed_(&self) -> StorageResult>> { + fn get_committed_(&self) -> StorageResult> { Ok(self .db .get_cf(self.store(), b"committed") @@ -336,7 +319,7 @@ impl LogStore { .and_then(|v| serde_json::from_slice(&v).ok())) } - fn set_vote_(&self, vote: &Vote) -> StorageResult<()> { + fn set_vote_(&self, vote: &Vote) -> StorageResult<()> { self.db .put_cf(self.store(), b"vote", serde_json::to_vec(vote).unwrap()) .map_err(|e| StorageError::write_vote(&e))?; @@ -345,7 +328,7 @@ impl LogStore { Ok(()) } - fn get_vote_(&self) -> StorageResult>> { + fn get_vote_(&self) -> StorageResult> { Ok(self .db .get_cf(self.store(), b"vote") @@ -358,7 +341,7 @@ impl RaftLogReader for LogStore { async fn try_get_log_entries + Clone + Debug + OptionalSend>( &mut self, range: RB, - ) -> StorageResult>> { + ) -> StorageResult> { let start = match range.start_bound() { std::ops::Bound::Included(x) => id_to_bin(*x), std::ops::Bound::Excluded(x) => id_to_bin(*x + 1), @@ -368,8 +351,7 @@ impl RaftLogReader for LogStore { .iterator_cf(self.logs(), rocksdb::IteratorMode::From(&start, Direction::Forward)) .map(|res| { let (id, val) = res.unwrap(); - let entry: StorageResult> = - serde_json::from_slice(&val).map_err(|e| StorageError::read_logs(&e)); + let entry: StorageResult = serde_json::from_slice(&val).map_err(|e| StorageError::read_logs(&e)); let id = bin_to_id(&id); assert_eq!(Ok(id), entry.as_ref().map(|e| e.log_id.index)); @@ -380,7 +362,7 @@ impl RaftLogReader for LogStore { .collect() } - async fn read_vote(&mut self) -> Result>, StorageError> { + async fn read_vote(&mut self) -> Result, StorageError> { self.get_vote_() } } @@ -388,10 +370,10 @@ impl RaftLogReader for LogStore { impl RaftLogStorage for LogStore { type LogReader = Self; - async fn get_log_state(&mut self) -> StorageResult> { + async fn get_log_state(&mut self) -> StorageResult { let last = self.db.iterator_cf(self.logs(), rocksdb::IteratorMode::End).next().and_then(|res| { let (_, ent) = res.unwrap(); - Some(serde_json::from_slice::>(&ent).ok()?.log_id) + Some(serde_json::from_slice::(&ent).ok()?.log_id) }); let last_purged_log_id = self.get_last_purged_()?; @@ -406,25 +388,25 @@ impl RaftLogStorage for LogStore { }) } - async fn save_committed(&mut self, _committed: Option>) -> Result<(), StorageError> { + async fn save_committed(&mut self, _committed: Option) -> Result<(), StorageError> { self.set_committed_(&_committed)?; Ok(()) } - async fn read_committed(&mut self) -> Result>, StorageError> { + async fn read_committed(&mut self) -> Result, StorageError> { let c = self.get_committed_()?; Ok(c) } #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { self.set_vote_(vote) } #[tracing::instrument(level = "trace", skip_all)] - async fn append(&mut self, entries: I, callback: IOFlushed) -> StorageResult<()> + async fn append(&mut self, entries: I, callback: IOFlushed) -> StorageResult<()> where - I: IntoIterator> + Send, + I: IntoIterator + Send, I::IntoIter: Send, { for entry in entries { @@ -445,7 +427,7 @@ impl RaftLogStorage for LogStore { } #[tracing::instrument(level = "debug", skip(self))] - async fn truncate(&mut self, log_id: LogId) -> StorageResult<()> { + async fn truncate(&mut self, log_id: LogId) -> StorageResult<()> { tracing::debug!("delete_log: [{:?}, +oo)", log_id); let from = id_to_bin(log_id.index); @@ -454,7 +436,7 @@ impl RaftLogStorage for LogStore { } #[tracing::instrument(level = "debug", skip(self))] - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("delete_log: [0, {:?}]", log_id); self.set_last_purged_(log_id)?; diff --git a/examples/utils/README.md b/examples/utils/README.md new file mode 100644 index 000000000..0e81f377f --- /dev/null +++ b/examples/utils/README.md @@ -0,0 +1,3 @@ +# Utils for examples + +`declare_types.rs` declares types for a specific `RaftTypeConfig` implementation. \ No newline at end of file diff --git a/examples/utils/declare_types.rs b/examples/utils/declare_types.rs new file mode 100644 index 000000000..69491f66a --- /dev/null +++ b/examples/utils/declare_types.rs @@ -0,0 +1,44 @@ +// Reference the containing module's type config. +use super::TypeConfig; + +pub type Raft = openraft::Raft; + +pub type Vote = openraft::Vote; +pub type LeaderId = openraft::LeaderId; +pub type LogId = openraft::LogId; +pub type Entry = openraft::Entry; +pub type EntryPayload = openraft::EntryPayload; +pub type StoredMembership = openraft::StoredMembership; + +pub type LogState = openraft::storage::LogState; + +pub type SnapshotMeta = openraft::SnapshotMeta; +pub type Snapshot = openraft::Snapshot; +pub type SnapshotData = ::SnapshotData; + +pub type IOFlushed = openraft::storage::IOFlushed; + +pub type Infallible = openraft::error::Infallible; +pub type Fatal = openraft::error::Fatal; +pub type RaftError = openraft::error::RaftError; +pub type RPCError = openraft::error::RPCError; + +pub type ErrorSubject = openraft::ErrorSubject; +pub type StorageError = openraft::StorageError; +pub type StreamingError = openraft::error::StreamingError; + +pub type RaftMetrics = openraft::RaftMetrics; + +pub type ClientWriteError = openraft::error::ClientWriteError; +pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; +pub type ForwardToLeader = openraft::error::ForwardToLeader; +pub type InitializeError = openraft::error::InitializeError; + +pub type VoteRequest = openraft::raft::VoteRequest; +pub type VoteResponse = openraft::raft::VoteResponse; +pub type AppendEntriesRequest = openraft::raft::AppendEntriesRequest; +pub type AppendEntriesResponse = openraft::raft::AppendEntriesResponse; +pub type InstallSnapshotRequest = openraft::raft::InstallSnapshotRequest; +pub type InstallSnapshotResponse = openraft::raft::InstallSnapshotResponse; +pub type SnapshotResponse = openraft::raft::SnapshotResponse; +pub type ClientWriteResponse = openraft::raft::ClientWriteResponse;