From ba69264441a762e910834c07b10bf8f958b9981f Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Thu, 22 Feb 2024 16:10:38 +0100 Subject: [PATCH 01/11] Feature: Have oneshot as a Runtime implementation Signed-off-by: Anthony Griffon --- cluster_benchmark/tests/benchmark/store.rs | 11 ++- examples/memstore/src/log_store.rs | 12 ++- .../src/store.rs | 10 ++- examples/raft-kv-rocksdb/src/store.rs | 7 +- openraft/src/async_runtime.rs | 58 ++++++++++++ openraft/src/core/raft_core.rs | 41 ++++++--- .../src/core/raft_msg/external_command.rs | 5 +- openraft/src/core/raft_msg/mod.rs | 34 +++---- openraft/src/core/sm/command.rs | 10 ++- openraft/src/core/sm/mod.rs | 7 +- openraft/src/engine/command.rs | 88 ++++++++++++++----- openraft/src/engine/engine_impl.rs | 15 ++-- .../handler/vote_handler/accept_vote_test.rs | 9 +- .../src/engine/handler/vote_handler/mod.rs | 12 +-- openraft/src/raft/external_request.rs | 14 ++- openraft/src/raft/mod.rs | 40 +++++---- openraft/src/raft/raft_inner.rs | 9 +- openraft/src/replication/mod.rs | 2 +- openraft/src/replication/request.rs | 13 ++- openraft/src/storage/adapter.rs | 11 ++- openraft/src/storage/callback.rs | 18 ++-- openraft/src/storage/v2.rs | 7 +- openraft/src/testing/mod.rs | 4 +- openraft/src/testing/suite.rs | 3 +- openraft/src/timer/timeout_test.rs | 11 +-- stores/rocksstore-v2/src/lib.rs | 2 +- 26 files changed, 327 insertions(+), 126 deletions(-) diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index f6d5360cc..ea83a5772 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -19,7 +19,6 @@ use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; use openraft::OptionalSend; -use openraft::OptionalSync; use openraft::RaftLogId; use openraft::RaftTypeConfig; use openraft::SnapshotMeta; @@ -225,8 +224,14 @@ impl RaftLogStorage for Arc { } #[tracing::instrument(level = "trace", skip_all)] - async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> - where I: IntoIterator> + Send { + async fn append( + &mut self, + entries: I, + callback: LogFlushed<::AsyncRuntime, NodeId>, + ) -> Result<(), StorageError> + where + I: IntoIterator> + Send, + { { let mut log = self.log.write().await; log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index, entry))); diff --git a/examples/memstore/src/log_store.rs b/examples/memstore/src/log_store.rs index 25715e781..deeb8ef5a 100644 --- a/examples/memstore/src/log_store.rs +++ b/examples/memstore/src/log_store.rs @@ -93,8 +93,14 @@ impl LogStoreInner { Ok(self.vote) } - async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> - where I: IntoIterator { + async fn append( + &mut self, + entries: I, + callback: LogFlushed, + ) -> Result<(), StorageError> + where + I: IntoIterator, + { // Simple implementation that calls the flush-before-return `append_to_log`. for entry in entries { self.log.insert(entry.get_log_id().index, entry); @@ -191,7 +197,7 @@ mod impl_log_store { async fn append( &mut self, entries: I, - callback: LogFlushed, + callback: LogFlushed, ) -> Result<(), StorageError> where I: IntoIterator, diff --git a/examples/raft-kv-memstore-singlethreaded/src/store.rs b/examples/raft-kv-memstore-singlethreaded/src/store.rs index 425130e19..f0dd1a054 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/store.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/store.rs @@ -321,8 +321,14 @@ impl RaftLogStorage for Rc { } #[tracing::instrument(level = "trace", skip(self, entries, callback))] - async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> - where I: IntoIterator> { + async fn append( + &mut self, + entries: I, + callback: LogFlushed<::AsyncRuntime, NodeId>, + ) -> Result<(), StorageError> + where + I: IntoIterator>, + { // Simple implementation that calls the flush-before-return `append_to_log`. let mut log = self.log.borrow_mut(); for entry in entries { diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index bd678d04a..1d09d7eeb 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -22,6 +22,7 @@ use openraft::LogId; use openraft::OptionalSend; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; +use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -436,7 +437,11 @@ impl RaftLogStorage for LogStore { } #[tracing::instrument(level = "trace", skip_all)] - async fn append(&mut self, entries: I, callback: LogFlushed) -> StorageResult<()> + async fn append( + &mut self, + entries: I, + callback: LogFlushed<::AsyncRuntime, NodeId>, + ) -> StorageResult<()> where I: IntoIterator> + Send, I::IntoIter: Send, diff --git a/openraft/src/async_runtime.rs b/openraft/src/async_runtime.rs index 5e9c73e2a..a84a4eac2 100644 --- a/openraft/src/async_runtime.rs +++ b/openraft/src/async_runtime.rs @@ -44,6 +44,17 @@ pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static /// Type of a thread-local random number generator. type ThreadLocalRng: rand::Rng; + /// Type of a `oneshot` sender. + type OneshotSender: AsyncOneshotSendExt + OptionalSend + OptionalSync + Debug + Sized; + + type OneshotReceiverError: std::error::Error + OptionalSend; + + /// Type of a `oneshot` receiver. + type OneshotReceiver: OptionalSend + + OptionalSync + + Future> + + Unpin; + /// Spawn a new task. fn spawn(future: T) -> Self::JoinHandle where @@ -72,12 +83,24 @@ pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static /// This is a per-thread instance, which cannot be shared across threads or /// sent to another thread. fn thread_rng() -> Self::ThreadLocalRng; + + /// Creates a new one-shot channel for sending single values. + /// + /// The function returns separate "send" and "receive" handles. The `Sender` + /// handle is used by the producer to send the value. The `Receiver` handle is + /// used by the consumer to receive the value. + /// + /// Each handle can be used on separate tasks. + fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) + where T: OptionalSend; } /// `Tokio` is the default asynchronous executor. #[derive(Debug, Default)] pub struct TokioRuntime; +pub struct TokioSendWrapper(pub tokio::sync::oneshot::Sender); + impl AsyncRuntime for TokioRuntime { type JoinError = tokio::task::JoinError; type JoinHandle = tokio::task::JoinHandle; @@ -86,6 +109,9 @@ impl AsyncRuntime for TokioRuntime { type TimeoutError = tokio::time::error::Elapsed; type Timeout + OptionalSend> = tokio::time::Timeout; type ThreadLocalRng = rand::rngs::ThreadRng; + type OneshotSender = TokioSendWrapper; + type OneshotReceiver = tokio::sync::oneshot::Receiver; + type OneshotReceiverError = tokio::sync::oneshot::error::RecvError; #[inline] fn spawn(future: T) -> Self::JoinHandle @@ -132,4 +158,36 @@ impl AsyncRuntime for TokioRuntime { fn thread_rng() -> Self::ThreadLocalRng { rand::thread_rng() } + + #[inline] + fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) + where T: OptionalSend { + let (tx, rx) = tokio::sync::oneshot::channel(); + (TokioSendWrapper(tx), rx) + } +} + +pub trait AsyncOneshotSendExt: Unpin { + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. + /// + /// This method consumes `self` as only one value may ever be sent on a `oneshot` + /// channel. It is not marked async because sending a message to an `oneshot` + /// channel never requires any form of waiting. Because of this, the `send` + /// method can be used in both synchronous and asynchronous code without + /// problems. + fn send(self, t: T) -> Result<(), T>; +} + +impl AsyncOneshotSendExt for TokioSendWrapper { + #[inline] + fn send(self, t: T) -> Result<(), T> { + self.0.send(t) + } +} + +impl Debug for TokioSendWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("TokioSendWrapper").finish() + } } diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 9ce86f542..629126cac 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -15,12 +15,12 @@ use futures::TryFutureExt; use maplit::btreeset; use tokio::select; use tokio::sync::mpsc; -use tokio::sync::oneshot; use tokio::sync::watch; use tracing::Instrument; use tracing::Level; use tracing::Span; +use crate::async_runtime::AsyncOneshotSendExt; use crate::config::Config; use crate::config::RuntimeConfig; use crate::core::balancer::Balancer; @@ -215,7 +215,10 @@ where SM: RaftStateMachine, { /// The main loop of the Raft protocol. - pub(crate) async fn main(mut self, rx_shutdown: oneshot::Receiver<()>) -> Result<(), Fatal> { + pub(crate) async fn main( + mut self, + rx_shutdown: ::OneshotReceiver<()>, + ) -> Result<(), Fatal> { let span = tracing::span!(parent: &self.span, Level::DEBUG, "main"); let res = self.do_main(rx_shutdown).instrument(span).await; @@ -239,7 +242,10 @@ where } #[tracing::instrument(level="trace", skip_all, fields(id=display(self.id), cluster=%self.config.cluster_name))] - async fn do_main(&mut self, rx_shutdown: oneshot::Receiver<()>) -> Result<(), Fatal> { + async fn do_main( + &mut self, + rx_shutdown: ::OneshotReceiver<()>, + ) -> Result<(), Fatal> { tracing::debug!("raft node is initializing"); self.engine.startup(); @@ -432,7 +438,7 @@ where &mut self, changes: ChangeMembers, retain: bool, - tx: ResultSender, ClientWriteError>, + tx: ResultSender, ClientWriteResponse, ClientWriteError>, ) { let res = self.engine.state.membership_state.change_handler().apply(changes, retain); let new_membership = match res { @@ -593,7 +599,7 @@ where pub(crate) fn handle_initialize( &mut self, member_nodes: BTreeMap, - tx: ResultSender<(), InitializeError>, + tx: ResultSender, (), InitializeError>, ) { tracing::debug!(member_nodes = debug(&member_nodes), "{}", func_name!()); @@ -616,8 +622,12 @@ where /// Reject a request due to the Raft node being in a state which prohibits the request. #[tracing::instrument(level = "trace", skip(self, tx))] - pub(crate) fn reject_with_forward_to_leader(&self, tx: ResultSender) - where E: From> { + pub(crate) fn reject_with_forward_to_leader( + &self, + tx: ResultSender, T, E>, + ) where + E: From>, + { let mut leader_id = self.current_leader(); let leader_node = self.get_leader_node(leader_id); @@ -680,7 +690,7 @@ where { tracing::debug!("append_to_log"); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); let callback = LogFlushed::new(Some(last_log_id), tx); self.log_store.append(entries, callback).await?; rx.await @@ -865,7 +875,10 @@ where /// Run an event handling loop #[tracing::instrument(level="debug", skip_all, fields(id=display(self.id)))] - async fn runtime_loop(&mut self, mut rx_shutdown: oneshot::Receiver<()>) -> Result<(), Fatal> { + async fn runtime_loop( + &mut self, + mut rx_shutdown: ::OneshotReceiver<()>, + ) -> Result<(), Fatal> { // Ratio control the ratio of number of RaftMsg to process to number of Notify to process. let mut balancer = Balancer::new(10_000); @@ -1067,7 +1080,11 @@ where } #[tracing::instrument(level = "debug", skip_all)] - pub(super) fn handle_vote_request(&mut self, req: VoteRequest, tx: VoteTx) { + pub(super) fn handle_vote_request( + &mut self, + req: VoteRequest, + tx: VoteTx, C::NodeId>, + ) { tracing::info!(req = display(req.summary()), func = func_name!()); let resp = self.engine.handle_vote_req(req); @@ -1081,7 +1098,7 @@ where pub(super) fn handle_append_entries_request( &mut self, req: AppendEntriesRequest, - tx: AppendEntriesTx, + tx: AppendEntriesTx, C::NodeId>, ) { tracing::debug!(req = display(req.summary()), func = func_name!()); @@ -1657,7 +1674,7 @@ where // Create a channel to let state machine worker to send the snapshot and the replication // worker to receive it. - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); let cmd = sm::Command::get_snapshot(tx); self.sm_handle diff --git a/openraft/src/core/raft_msg/external_command.rs b/openraft/src/core/raft_msg/external_command.rs index 17c983b3c..666d5bfd2 100644 --- a/openraft/src/core/raft_msg/external_command.rs +++ b/openraft/src/core/raft_msg/external_command.rs @@ -3,6 +3,7 @@ use std::fmt; use crate::core::raft_msg::ResultSender; +use crate::type_config::alias::AsyncRuntimeOf; use crate::RaftTypeConfig; use crate::Snapshot; @@ -23,7 +24,9 @@ pub(crate) enum ExternalCommand { Snapshot, /// Get a snapshot from the state machine, send back via a oneshot::Sender. - GetSnapshot { tx: ResultSender>> }, + GetSnapshot { + tx: ResultSender, Option>>, + }, /// Purge logs covered by a snapshot up to a specified index. /// diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index b6679d43a..39871d134 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -1,7 +1,5 @@ use std::collections::BTreeMap; -use tokio::sync::oneshot; - use crate::core::raft_msg::external_command::ExternalCommand; use crate::error::CheckIsLeaderError; use crate::error::ClientWriteError; @@ -15,10 +13,12 @@ use crate::raft::ClientWriteResponse; use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; +use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::LogIdOf; use crate::type_config::alias::NodeIdOf; use crate::type_config::alias::NodeOf; use crate::type_config::alias::SnapshotDataOf; +use crate::AsyncRuntime; use crate::ChangeMembers; use crate::MessageSummary; use crate::RaftTypeConfig; @@ -28,22 +28,26 @@ use crate::Vote; pub(crate) mod external_command; /// A oneshot TX to send result from `RaftCore` to external caller, e.g. `Raft::append_entries`. -pub(crate) type ResultSender = oneshot::Sender>; +pub(crate) type ResultSender = ::OneshotSender>; -pub(crate) type ResultReceiver = oneshot::Receiver>; +pub(crate) type ResultReceiver = ::OneshotReceiver>; /// TX for Vote Response -pub(crate) type VoteTx = ResultSender>; +pub(crate) type VoteTx = ResultSender>; /// TX for Append Entries Response -pub(crate) type AppendEntriesTx = ResultSender>; +pub(crate) type AppendEntriesTx = ResultSender>; /// TX for Client Write Response -pub(crate) type ClientWriteTx = ResultSender, ClientWriteError, NodeOf>>; +pub(crate) type ClientWriteTx = + ResultSender, ClientWriteResponse, ClientWriteError, NodeOf>>; /// TX for Linearizable Read Response -pub(crate) type ClientReadTx = - ResultSender<(Option>, Option>), CheckIsLeaderError, NodeOf>>; +pub(crate) type ClientReadTx = ResultSender< + AsyncRuntimeOf, + (Option>, Option>), + CheckIsLeaderError, NodeOf>, +>; /// A message sent by application to the [`RaftCore`]. /// @@ -53,18 +57,18 @@ where C: RaftTypeConfig { AppendEntries { rpc: AppendEntriesRequest, - tx: AppendEntriesTx, + tx: AppendEntriesTx, C::NodeId>, }, RequestVote { rpc: VoteRequest, - tx: VoteTx, + tx: VoteTx, C::NodeId>, }, InstallFullSnapshot { vote: Vote, snapshot: Snapshot, - tx: ResultSender>, + tx: ResultSender, SnapshotResponse>, }, /// Begin receiving a snapshot from the leader. @@ -74,7 +78,7 @@ where C: RaftTypeConfig /// will be returned in a Err BeginReceivingSnapshot { vote: Vote, - tx: ResultSender>, HigherVote>, + tx: ResultSender, Box>, HigherVote>, }, ClientWriteRequest { @@ -88,7 +92,7 @@ where C: RaftTypeConfig Initialize { members: BTreeMap, - tx: ResultSender<(), InitializeError>, + tx: ResultSender, (), InitializeError>, }, ChangeMembership { @@ -98,7 +102,7 @@ where C: RaftTypeConfig /// config will be converted into learners, otherwise they will be removed. retain: bool, - tx: ResultSender, ClientWriteError>, + tx: ResultSender, ClientWriteResponse, ClientWriteError>, }, ExternalCoreRequest { diff --git a/openraft/src/core/sm/command.rs b/openraft/src/core/sm/command.rs index 2c49c2597..5930322cf 100644 --- a/openraft/src/core/sm/command.rs +++ b/openraft/src/core/sm/command.rs @@ -54,12 +54,14 @@ where C: RaftTypeConfig Command::new(payload) } - pub(crate) fn get_snapshot(tx: ResultSender>>) -> Self { + pub(crate) fn get_snapshot(tx: ResultSender>>) -> Self { let payload = CommandPayload::GetSnapshot { tx }; Command::new(payload) } - pub(crate) fn begin_receiving_snapshot(tx: ResultSender>, HigherVote>) -> Self { + pub(crate) fn begin_receiving_snapshot( + tx: ResultSender>, HigherVote>, + ) -> Self { let payload = CommandPayload::BeginReceivingSnapshot { tx }; Command::new(payload) } @@ -91,11 +93,11 @@ where C: RaftTypeConfig /// Get the latest built snapshot. GetSnapshot { - tx: ResultSender>>, + tx: ResultSender>>, }, BeginReceivingSnapshot { - tx: ResultSender>, HigherVote>, + tx: ResultSender>, HigherVote>, }, InstallFullSnapshot { diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index 05a9d6987..78692202b 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -6,11 +6,13 @@ use tokio::sync::mpsc; +use crate::async_runtime::AsyncOneshotSendExt; use crate::core::ApplyResult; use crate::core::ApplyingEntry; use crate::entry::RaftPayload; use crate::storage::RaftStateMachine; use crate::summary::MessageSummary; +use crate::type_config::alias::AsyncRuntimeOf; use crate::AsyncRuntime; use crate::RaftLogId; use crate::RaftSnapshotBuilder; @@ -219,7 +221,10 @@ where } #[tracing::instrument(level = "info", skip_all)] - async fn get_snapshot(&mut self, tx: ResultSender>>) -> Result<(), StorageError> { + async fn get_snapshot( + &mut self, + tx: ResultSender, Option>>, + ) -> Result<(), StorageError> { tracing::info!("{}", func_name!()); let snapshot = self.state_machine.get_current_snapshot().await?; diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index a61ff97f4..60663916b 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -1,7 +1,6 @@ use std::fmt::Debug; -use tokio::sync::oneshot; - +use crate::async_runtime::AsyncOneshotSendExt; use crate::core::sm; use crate::engine::CommandKind; use crate::error::Infallible; @@ -14,10 +13,12 @@ use crate::raft::InstallSnapshotResponse; use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; +use crate::AsyncRuntime; use crate::LeaderId; use crate::LogId; use crate::Node; use crate::NodeId; +use crate::OptionalSend; use crate::RaftTypeConfig; use crate::Vote; @@ -98,7 +99,7 @@ where C: RaftTypeConfig /// Send result to caller Respond { when: Option>, - resp: Respond, + resp: Respond, }, } @@ -219,30 +220,58 @@ where NID: NodeId /// A command to send return value to the caller via a `oneshot::Sender`. #[derive(Debug)] -#[derive(PartialEq, Eq)] #[derive(derive_more::From)] -pub(crate) enum Respond +pub(crate) enum Respond where NID: NodeId, N: Node, + R: AsyncRuntime, +{ + Vote(ValueSender, Infallible>>), + AppendEntries(ValueSender, Infallible>>), + ReceiveSnapshotChunk(ValueSender>), + InstallSnapshot(ValueSender, InstallSnapshotError>>), + InstallFullSnapshot(ValueSender, Infallible>>), + Initialize(ValueSender>>), +} + +impl PartialEq for Respond +where + NID: NodeId + PartialEq, + N: Node + PartialEq, + R: AsyncRuntime, +{ + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Respond::Vote(f0_self), Respond::Vote(f0_other)) => f0_self.eq(f0_other), + (Respond::AppendEntries(f0_self), Respond::AppendEntries(f0_other)) => f0_self.eq(f0_other), + (Respond::ReceiveSnapshotChunk(f0_self), Respond::ReceiveSnapshotChunk(f0_other)) => f0_self.eq(f0_other), + (Respond::InstallSnapshot(f0_self), Respond::InstallSnapshot(f0_other)) => f0_self.eq(f0_other), + (Respond::InstallFullSnapshot(f0_self), Respond::InstallFullSnapshot(f0_other)) => f0_self.eq(f0_other), + (Respond::Initialize(f0_self), Respond::Initialize(f0_other)) => f0_self.eq(f0_other), + _unused => false, + } + } +} + +impl Eq for Respond +where + NID: NodeId + Eq, + N: Node + Eq, + R: AsyncRuntime, { - Vote(ValueSender, Infallible>>), - AppendEntries(ValueSender, Infallible>>), - ReceiveSnapshotChunk(ValueSender>), - InstallSnapshot(ValueSender, InstallSnapshotError>>), - InstallFullSnapshot(ValueSender, Infallible>>), - Initialize(ValueSender>>), } -impl Respond +impl Respond where NID: NodeId, N: Node, + R: AsyncRuntime, { - pub(crate) fn new(res: T, tx: oneshot::Sender) -> Self + pub(crate) fn new(res: T, tx: R::OneshotSender) -> Self where - T: Debug + PartialEq + Eq, - Self: From>, + T: Debug + PartialEq + Eq + OptionalSend, + Self: From>, { Respond::from(ValueSender::new(res, tx)) } @@ -260,27 +289,38 @@ where } #[derive(Debug)] -pub(crate) struct ValueSender -where T: Debug + PartialEq + Eq +pub(crate) struct ValueSender +where + T: Debug + PartialEq + Eq + OptionalSend, + R: AsyncRuntime, { value: T, - tx: oneshot::Sender, + tx: R::OneshotSender, } -impl PartialEq for ValueSender -where T: Debug + PartialEq + Eq +impl PartialEq for ValueSender +where + T: Debug + PartialEq + Eq + OptionalSend, + R: AsyncRuntime, { fn eq(&self, other: &Self) -> bool { self.value == other.value } } -impl Eq for ValueSender where T: Debug + PartialEq + Eq {} +impl Eq for ValueSender +where + T: Debug + PartialEq + Eq + OptionalSend, + R: AsyncRuntime, +{ +} -impl ValueSender -where T: Debug + PartialEq + Eq +impl ValueSender +where + T: Debug + PartialEq + Eq + OptionalSend, + R: AsyncRuntime, { - pub(crate) fn new(res: T, tx: oneshot::Sender) -> Self { + pub(crate) fn new(res: T, tx: R::OneshotSender) -> Self { Self { value: res, tx } } diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 8fbe07c0a..e79f26993 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -2,6 +2,7 @@ use std::time::Duration; use validit::Valid; +use crate::async_runtime::AsyncOneshotSendExt; use crate::core::raft_msg::AppendEntriesTx; use crate::core::raft_msg::ResultSender; use crate::core::sm; @@ -38,12 +39,14 @@ use crate::raft::VoteResponse; use crate::raft_state::LogStateReader; use crate::raft_state::RaftState; use crate::summary::MessageSummary; +use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::SnapshotDataOf; use crate::AsyncRuntime; use crate::Instant; use crate::LogId; use crate::LogIdOptionExt; use crate::Membership; +use crate::OptionalSend; use crate::RaftLogId; use crate::RaftTypeConfig; use crate::Snapshot; @@ -222,9 +225,11 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn get_leader_handler_or_reject( &mut self, - tx: Option>, - ) -> Option<(LeaderHandler, Option>)> + tx: Option>, + ) -> Option<(LeaderHandler, Option>)> where + T: OptionalSend, + E: OptionalSend, E: From>, { let res = self.leader_handler(); @@ -391,7 +396,7 @@ where C: RaftTypeConfig vote: &Vote, prev_log_id: Option>, entries: Vec, - tx: Option>, + tx: Option, C::NodeId>>, ) -> bool { tracing::debug!( vote = display(vote), @@ -454,7 +459,7 @@ where C: RaftTypeConfig &mut self, vote: Vote, snapshot: Snapshot, - tx: ResultSender>, + tx: ResultSender>, ) { tracing::info!(vote = display(vote), snapshot = display(&snapshot), "{}", func_name!()); @@ -487,7 +492,7 @@ where C: RaftTypeConfig pub(crate) fn handle_begin_receiving_snapshot( &mut self, vote: Vote, - tx: ResultSender>, HigherVote>, + tx: ResultSender>, HigherVote>, ) { tracing::info!(vote = display(vote), "{}", func_name!()); diff --git a/openraft/src/engine/handler/vote_handler/accept_vote_test.rs b/openraft/src/engine/handler/vote_handler/accept_vote_test.rs index 6e71a591d..f7bf9c5d2 100644 --- a/openraft/src/engine/handler/vote_handler/accept_vote_test.rs +++ b/openraft/src/engine/handler/vote_handler/accept_vote_test.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use maplit::btreeset; use pretty_assertions::assert_eq; -use tokio::sync::oneshot; use crate::core::ServerState; use crate::engine::testing::UTConfig; @@ -13,8 +12,10 @@ use crate::error::Infallible; use crate::raft::VoteResponse; use crate::testing::log_id; use crate::utime::UTime; +use crate::AsyncRuntime; use crate::EffectiveMembership; use crate::Membership; +use crate::RaftTypeConfig; use crate::TokioInstant; use crate::Vote; @@ -51,12 +52,12 @@ fn test_accept_vote_reject_smaller_vote() -> anyhow::Result<()> { // When a vote is reject, it generate SendResultCommand and return an error. let mut eng = eng(); - let (tx, _rx) = oneshot::channel(); + let (tx, _rx) = <::AsyncRuntime as AsyncRuntime>::oneshot(); let resp = eng.vote_handler().accept_vote(&Vote::new(1, 2), tx, |_state, _err| mk_res()); assert!(resp.is_none()); - let (tx, _rx) = oneshot::channel(); + let (tx, _rx) = <::AsyncRuntime as AsyncRuntime>::oneshot(); assert_eq!( vec![ // @@ -76,7 +77,7 @@ fn test_accept_vote_granted_greater_vote() -> anyhow::Result<()> { // When a vote is accepted, it generate SaveVote command and return an Ok. let mut eng = eng(); - let (tx, _rx) = oneshot::channel(); + let (tx, _rx) = <::AsyncRuntime as AsyncRuntime>::oneshot(); let resp = eng.vote_handler().accept_vote(&Vote::new(3, 3), tx, |_state, _err| mk_res()); assert!(resp.is_some()); diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index c6b6bdd1b..8a3ffe87d 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -11,9 +11,11 @@ use crate::error::RejectVoteRequest; use crate::internal_server_state::InternalServerState; use crate::leader::Leading; use crate::raft_state::LogStateReader; +use crate::type_config::alias::AsyncRuntimeOf; use crate::utime::UTime; use crate::AsyncRuntime; use crate::Instant; +use crate::OptionalSend; use crate::RaftState; use crate::RaftTypeConfig; use crate::Vote; @@ -50,13 +52,13 @@ where C: RaftTypeConfig pub(crate) fn accept_vote( &mut self, vote: &Vote, - tx: ResultSender, + tx: ResultSender, T, E>, f: F, - ) -> Option> + ) -> Option, T, E>> where - T: Debug + Eq, - E: Debug + Eq, - Respond: From>>, + T: Debug + Eq + OptionalSend, + E: Debug + Eq + OptionalSend, + Respond: From>>, F: Fn( &RaftState::Instant>, RejectVoteRequest, diff --git a/openraft/src/raft/external_request.rs b/openraft/src/raft/external_request.rs index c9b9b425e..a7309c38d 100644 --- a/openraft/src/raft/external_request.rs +++ b/openraft/src/raft/external_request.rs @@ -3,7 +3,19 @@ use crate::type_config::alias::InstantOf; use crate::type_config::alias::NodeIdOf; use crate::type_config::alias::NodeOf; +use crate::OptionalSend; use crate::RaftState; +use crate::RaftTypeConfig; + +pub trait BoxCoreFnInternal: FnOnce(&RaftState, NodeOf, InstantOf>) + OptionalSend +where C: RaftTypeConfig +{ +} + +impl, NodeOf, InstantOf>) + OptionalSend> BoxCoreFnInternal + for T +{ +} /// Boxed trait object for external request function run in `RaftCore` task. -pub(crate) type BoxCoreFn = Box, NodeOf, InstantOf>) + Send + 'static>; +pub(crate) type BoxCoreFn = Box + 'static>; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 5ae484b05..3fece00cf 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -28,13 +28,13 @@ pub use message::SnapshotResponse; pub use message::VoteRequest; pub use message::VoteResponse; use tokio::sync::mpsc; -use tokio::sync::oneshot; use tokio::sync::watch; use tokio::sync::Mutex; use tracing::trace_span; use tracing::Instrument; use tracing::Level; +use crate::async_runtime::AsyncOneshotSendExt; use crate::config::Config; use crate::config::RuntimeConfig; use crate::core::command_state::CommandState; @@ -70,6 +70,7 @@ use crate::ChangeMembers; use crate::LogId; use crate::LogIdOptionExt; use crate::MessageSummary; +use crate::OptionalSend; use crate::RaftState; pub use crate::RaftTypeConfig; use crate::Snapshot; @@ -180,7 +181,7 @@ where C: RaftTypeConfig let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id)); let (tx_data_metrics, rx_data_metrics) = watch::channel(RaftDataMetrics::default()); let (tx_server_metrics, rx_server_metrics) = watch::channel(RaftServerMetrics::default()); - let (tx_shutdown, rx_shutdown) = oneshot::channel(); + let (tx_shutdown, rx_shutdown) = C::AsyncRuntime::oneshot(); let tick_handle = Tick::spawn( Duration::from_millis(config.heartbeat_interval * 3 / 2), @@ -335,7 +336,7 @@ where C: RaftTypeConfig ) -> Result, RaftError> { tracing::debug!(rpc = display(rpc.summary()), "Raft::append_entries"); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); self.inner.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await } @@ -347,7 +348,7 @@ where C: RaftTypeConfig pub async fn vote(&self, rpc: VoteRequest) -> Result, RaftError> { tracing::info!(rpc = display(rpc.summary()), "Raft::vote()"); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); self.inner.call_core(RaftMsg::RequestVote { rpc, tx }, rx).await } @@ -359,7 +360,7 @@ where C: RaftTypeConfig pub async fn get_snapshot(&self) -> Result>, RaftError> { tracing::debug!("Raft::get_snapshot()"); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); let cmd = ExternalCommand::GetSnapshot { tx }; self.inner.call_core(RaftMsg::ExternalCommand { cmd }, rx).await } @@ -372,7 +373,7 @@ where C: RaftTypeConfig ) -> Result>, RaftError>> { tracing::info!("Raft::begin_receiving_snapshot()"); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); let resp = self.inner.call_core(RaftMsg::BeginReceivingSnapshot { vote, tx }, rx).await?; Ok(resp) } @@ -390,7 +391,7 @@ where C: RaftTypeConfig ) -> Result, Fatal> { tracing::info!("Raft::install_full_snapshot()"); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); let res = self.inner.call_core(RaftMsg::InstallFullSnapshot { vote, snapshot, tx }, rx).await; match res { Ok(x) => Ok(x), @@ -491,7 +492,7 @@ where C: RaftTypeConfig #[deprecated(since = "0.9.0", note = "use `Raft::ensure_linearizable()` instead")] #[tracing::instrument(level = "debug", skip(self))] pub async fn is_leader(&self) -> Result<(), RaftError>> { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); let _ = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?; Ok(()) } @@ -575,7 +576,7 @@ where C: RaftTypeConfig (Option>, Option>), RaftError>, > { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); let (read_log_id, applied) = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?; Ok((read_log_id, applied)) } @@ -603,7 +604,7 @@ where C: RaftTypeConfig &self, app_data: C::D, ) -> Result, RaftError>> { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); self.inner.call_core(RaftMsg::ClientWriteRequest { app_data, tx }, rx).await } @@ -636,7 +637,7 @@ where C: RaftTypeConfig where T: IntoNodes + Debug, { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); self.inner .call_core( RaftMsg::Initialize { @@ -671,7 +672,7 @@ where C: RaftTypeConfig node: C::Node, blocking: bool, ) -> Result, RaftError>> { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); let resp = self .inner .call_core( @@ -801,7 +802,7 @@ where C: RaftTypeConfig "change_membership: start to commit joint config" ); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); // res is error if membership can not be changed. // If no error, it will enter a joint state let res = self @@ -832,7 +833,7 @@ where C: RaftTypeConfig tracing::debug!("committed a joint config: {} {:?}", log_id, joint); tracing::debug!("the second step is to change to uniform config: {:?}", changes); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); let res = self.inner.call_core(RaftMsg::ChangeMembership { changes, retain, tx }, rx).await; if let Err(e) = &res { @@ -862,10 +863,12 @@ where C: RaftTypeConfig /// ``` pub async fn with_raft_state(&self, func: F) -> Result> where - F: FnOnce(&RaftState::Instant>) -> V + Send + 'static, - V: Send + 'static, + F: FnOnce(&RaftState::Instant>) -> V + + OptionalSend + + 'static, + V: OptionalSend + 'static, { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = C::AsyncRuntime::oneshot(); self.external_request(|st| { let result = func(st); @@ -899,7 +902,8 @@ where C: RaftTypeConfig /// If the API channel is already closed (Raft is in shutdown), then the request functor is /// destroyed right away and not called at all. pub fn external_request(&self, req: F) - where F: FnOnce(&RaftState::Instant>) + Send + 'static { + where F: FnOnce(&RaftState::Instant>) + OptionalSend + 'static + { let req: BoxCoreFn = Box::new(req); let _ignore_error = self.inner.tx_api.send(RaftMsg::ExternalCoreRequest { req }); } diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index ceecf7f4c..bbc3d3a27 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -3,7 +3,6 @@ use std::fmt::Debug; use std::sync::Arc; use tokio::sync::mpsc; -use tokio::sync::oneshot; use tokio::sync::watch; use tokio::sync::Mutex; use tracing::Level; @@ -20,6 +19,7 @@ use crate::raft::core_state::CoreState; use crate::AsyncRuntime; use crate::Config; use crate::MessageSummary; +use crate::OptionalSend; use crate::RaftMetrics; use crate::RaftTypeConfig; @@ -39,7 +39,7 @@ where C: RaftTypeConfig // TODO(xp): it does not need to be a async mutex. #[allow(clippy::type_complexity)] - pub(in crate::raft) tx_shutdown: Mutex>>, + pub(in crate::raft) tx_shutdown: Mutex::OneshotSender<()>>>, pub(in crate::raft) core_state: Mutex>, /// The ongoing snapshot transmission. @@ -55,10 +55,11 @@ where C: RaftTypeConfig pub(crate) async fn call_core( &self, mes: RaftMsg, - rx: oneshot::Receiver>, + rx: ::OneshotReceiver>, ) -> Result> where - E: Debug, + E: Debug + OptionalSend, + T: OptionalSend, { let sum = if tracing::enabled!(Level::DEBUG) { Some(mes.summary()) diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index d0927629a..a02e00b33 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -733,7 +733,7 @@ where #[tracing::instrument(level = "info", skip_all)] async fn stream_snapshot( &mut self, - snapshot_rx: DataWithId>>>, + snapshot_rx: DataWithId, Option>>>, ) -> Result>, ReplicationError> { let request_id = snapshot_rx.request_id(); let rx = snapshot_rx.into_data(); diff --git a/openraft/src/replication/request.rs b/openraft/src/replication/request.rs index bbcda719c..c145627a9 100644 --- a/openraft/src/replication/request.rs +++ b/openraft/src/replication/request.rs @@ -22,7 +22,10 @@ where C: RaftTypeConfig Self::Data(Data::new_logs(id, log_id_range)) } - pub(crate) fn snapshot(id: Option, snapshot_rx: ResultReceiver>>) -> Self { + pub(crate) fn snapshot( + id: Option, + snapshot_rx: ResultReceiver, Option>>, + ) -> Self { Self::Data(Data::new_snapshot(id, snapshot_rx)) } @@ -56,6 +59,7 @@ use crate::error::StreamingError; use crate::log_id_range::LogIdRange; use crate::raft::SnapshotResponse; use crate::replication::callbacks::SnapshotCallback; +use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::InstantOf; use crate::LogId; use crate::MessageSummary; @@ -73,7 +77,7 @@ where C: RaftTypeConfig { Heartbeat, Logs(DataWithId>), - Snapshot(DataWithId>>>), + Snapshot(DataWithId, Option>>>), SnapshotCallback(DataWithId>), } @@ -148,7 +152,10 @@ where C: RaftTypeConfig Self::Logs(DataWithId::new(request_id, log_id_range)) } - pub(crate) fn new_snapshot(request_id: Option, snapshot_rx: ResultReceiver>>) -> Self { + pub(crate) fn new_snapshot( + request_id: Option, + snapshot_rx: ResultReceiver, Option>>, + ) -> Self { Self::Snapshot(DataWithId::new(request_id, snapshot_rx)) } diff --git a/openraft/src/storage/adapter.rs b/openraft/src/storage/adapter.rs index 145123938..39cc9c171 100644 --- a/openraft/src/storage/adapter.rs +++ b/openraft/src/storage/adapter.rs @@ -12,6 +12,7 @@ use crate::storage::v2::sealed::Sealed; use crate::storage::LogFlushed; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; +use crate::type_config::alias::AsyncRuntimeOf; use crate::LogId; use crate::LogState; use crate::OptionalSend; @@ -147,8 +148,14 @@ where S::get_log_reader(self.storage_mut().await.deref_mut()).await } - async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> - where I: IntoIterator + OptionalSend { + async fn append( + &mut self, + entries: I, + callback: LogFlushed, C::NodeId>, + ) -> Result<(), StorageError> + where + I: IntoIterator + OptionalSend, + { // Default implementation that calls the flush-before-return `append_to_log`. S::append_to_log(self.storage_mut().await.deref_mut(), entries).await?; diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index a0bda0895..80432b7e1 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -4,26 +4,32 @@ use std::io; use tokio::sync::oneshot; +use crate::async_runtime::AsyncOneshotSendExt; use crate::display_ext::DisplayOption; +use crate::AsyncRuntime; use crate::LogId; use crate::NodeId; use crate::RaftTypeConfig; use crate::StorageIOError; /// A oneshot callback for completion of log io operation. -pub struct LogFlushed -where NID: NodeId +pub struct LogFlushed +where + NID: NodeId, + Runtime: AsyncRuntime, { last_log_id: Option>, - tx: oneshot::Sender>, io::Error>>, + tx: Runtime::OneshotSender>, io::Error>>, } -impl LogFlushed -where NID: NodeId +impl LogFlushed +where + NID: NodeId, + Runtime: AsyncRuntime, { pub(crate) fn new( last_log_id: Option>, - tx: oneshot::Sender>, io::Error>>, + tx: Runtime::OneshotSender>, io::Error>>, ) -> Self { Self { last_log_id, tx } } diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index 77cbf28fb..cd074d1e3 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -6,6 +6,7 @@ use macros::add_async_trait; use crate::storage::callback::LogFlushed; use crate::storage::v2::sealed::Sealed; +use crate::type_config::alias::AsyncRuntimeOf; use crate::LogId; use crate::LogState; use crate::OptionalSend; @@ -120,7 +121,11 @@ where C: RaftTypeConfig /// /// - There must not be a **hole** in logs. Because Raft only examine the last log id to ensure /// correctness. - async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + async fn append( + &mut self, + entries: I, + callback: LogFlushed, C::NodeId>, + ) -> Result<(), StorageError> where I: IntoIterator + OptionalSend, I::IntoIter: OptionalSend; diff --git a/openraft/src/testing/mod.rs b/openraft/src/testing/mod.rs index 09530dd52..13a216abc 100644 --- a/openraft/src/testing/mod.rs +++ b/openraft/src/testing/mod.rs @@ -6,12 +6,12 @@ use std::collections::BTreeSet; use anyerror::AnyError; pub use store_builder::StoreBuilder; pub use suite::Suite; -use tokio::sync::oneshot; use crate::entry::RaftEntry; use crate::log_id::RaftLogId; use crate::storage::LogFlushed; use crate::storage::RaftLogStorage; +use crate::AsyncRuntime; use crate::CommittedLeaderId; use crate::LogId; use crate::RaftTypeConfig; @@ -55,7 +55,7 @@ where let entries = entries.into_iter().collect::>(); let last_log_id = entries.last().map(|e| *e.get_log_id()).unwrap(); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = ::oneshot(); let cb = LogFlushed::new(Some(last_log_id), tx); log_store.append(entries, cb).await?; rx.await.unwrap().map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?; diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 0e4a20a34..49eb2a895 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -6,7 +6,6 @@ use std::time::Duration; use anyerror::AnyError; use maplit::btreeset; -use tokio::sync::oneshot; use crate::entry::RaftEntry; use crate::log_id::RaftLogId; @@ -1171,7 +1170,7 @@ where let entries = entries.into_iter().collect::>(); let last_log_id = *entries.last().unwrap().get_log_id(); - let (tx, rx) = oneshot::channel(); + let (tx, rx) = ::oneshot(); let cb = LogFlushed::new(Some(last_log_id), tx); diff --git a/openraft/src/timer/timeout_test.rs b/openraft/src/timer/timeout_test.rs index 98de77cc9..445ebd51a 100644 --- a/openraft/src/timer/timeout_test.rs +++ b/openraft/src/timer/timeout_test.rs @@ -1,11 +1,12 @@ use std::time::Duration; -use tokio::sync::oneshot; use tokio::time::sleep; use tokio::time::Instant; +use crate::async_runtime::AsyncOneshotSendExt; use crate::timer::timeout::RaftTimer; use crate::timer::Timeout; +use crate::AsyncRuntime; use crate::TokioRuntime; #[cfg(not(feature = "singlethreaded"))] @@ -24,7 +25,7 @@ fn test_timeout() -> anyhow::Result<()> { async fn test_timeout_inner() -> anyhow::Result<()> { tracing::info!("--- set timeout, recv result"); { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = ::oneshot(); let now = Instant::now(); let _t = Timeout::::new( || { @@ -43,7 +44,7 @@ async fn test_timeout_inner() -> anyhow::Result<()> { tracing::info!("--- update timeout"); { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = ::oneshot(); let now = Instant::now(); let t = Timeout::::new( || { @@ -65,7 +66,7 @@ async fn test_timeout_inner() -> anyhow::Result<()> { tracing::info!("--- update timeout to a lower value wont take effect"); { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = ::oneshot(); let now = Instant::now(); let t = Timeout::::new( || { @@ -87,7 +88,7 @@ async fn test_timeout_inner() -> anyhow::Result<()> { tracing::info!("--- drop the `Timeout` will cancel the callback"); { - let (tx, rx) = oneshot::channel(); + let (tx, rx) = ::oneshot(); let now = Instant::now(); let t = Timeout::::new( || { diff --git a/stores/rocksstore-v2/src/lib.rs b/stores/rocksstore-v2/src/lib.rs index 918c52f4f..d0011353b 100644 --- a/stores/rocksstore-v2/src/lib.rs +++ b/stores/rocksstore-v2/src/lib.rs @@ -353,7 +353,7 @@ impl RaftLogStorage for RocksLogStore { async fn append( &mut self, entries: I, - callback: LogFlushed, + callback: LogFlushed<::AsyncRuntime, RocksNodeId>, ) -> Result<(), StorageError> where I: IntoIterator> + Send, From 20743d310731d1f82e3155218ceb7e0d869415d7 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Fri, 23 Feb 2024 14:11:26 +0100 Subject: [PATCH 02/11] Fixup: review fixes Signed-off-by: Anthony Griffon --- openraft/src/async_runtime.rs | 11 ++++++----- openraft/src/engine/command.rs | 20 ++++++++++++++------ 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/openraft/src/async_runtime.rs b/openraft/src/async_runtime.rs index a84a4eac2..a34627498 100644 --- a/openraft/src/async_runtime.rs +++ b/openraft/src/async_runtime.rs @@ -47,6 +47,7 @@ pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static /// Type of a `oneshot` sender. type OneshotSender: AsyncOneshotSendExt + OptionalSend + OptionalSync + Debug + Sized; + /// Type of a `oneshot` receiver error. type OneshotReceiverError: std::error::Error + OptionalSend; /// Type of a `oneshot` receiver. @@ -99,7 +100,7 @@ pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static #[derive(Debug, Default)] pub struct TokioRuntime; -pub struct TokioSendWrapper(pub tokio::sync::oneshot::Sender); +pub struct TokioOneShotSender(pub tokio::sync::oneshot::Sender); impl AsyncRuntime for TokioRuntime { type JoinError = tokio::task::JoinError; @@ -109,7 +110,7 @@ impl AsyncRuntime for TokioRuntime { type TimeoutError = tokio::time::error::Elapsed; type Timeout + OptionalSend> = tokio::time::Timeout; type ThreadLocalRng = rand::rngs::ThreadRng; - type OneshotSender = TokioSendWrapper; + type OneshotSender = TokioOneShotSender; type OneshotReceiver = tokio::sync::oneshot::Receiver; type OneshotReceiverError = tokio::sync::oneshot::error::RecvError; @@ -163,7 +164,7 @@ impl AsyncRuntime for TokioRuntime { fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) where T: OptionalSend { let (tx, rx) = tokio::sync::oneshot::channel(); - (TokioSendWrapper(tx), rx) + (TokioOneShotSender(tx), rx) } } @@ -179,14 +180,14 @@ pub trait AsyncOneshotSendExt: Unpin { fn send(self, t: T) -> Result<(), T>; } -impl AsyncOneshotSendExt for TokioSendWrapper { +impl AsyncOneshotSendExt for TokioOneShotSender { #[inline] fn send(self, t: T) -> Result<(), T> { self.0.send(t) } } -impl Debug for TokioSendWrapper { +impl Debug for TokioOneShotSender { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("TokioSendWrapper").finish() } diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 60663916b..0235c89aa 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -243,12 +243,20 @@ where { fn eq(&self, other: &Self) -> bool { match (self, other) { - (Respond::Vote(f0_self), Respond::Vote(f0_other)) => f0_self.eq(f0_other), - (Respond::AppendEntries(f0_self), Respond::AppendEntries(f0_other)) => f0_self.eq(f0_other), - (Respond::ReceiveSnapshotChunk(f0_self), Respond::ReceiveSnapshotChunk(f0_other)) => f0_self.eq(f0_other), - (Respond::InstallSnapshot(f0_self), Respond::InstallSnapshot(f0_other)) => f0_self.eq(f0_other), - (Respond::InstallFullSnapshot(f0_self), Respond::InstallFullSnapshot(f0_other)) => f0_self.eq(f0_other), - (Respond::Initialize(f0_self), Respond::Initialize(f0_other)) => f0_self.eq(f0_other), + (Respond::Vote(first_sender), Respond::Vote(second_sender)) => first_sender.eq(second_sender), + (Respond::AppendEntries(first_sender), Respond::AppendEntries(second_sender)) => { + first_sender.eq(second_sender) + } + (Respond::ReceiveSnapshotChunk(first_sender), Respond::ReceiveSnapshotChunk(second_sender)) => { + first_sender.eq(second_sender) + } + (Respond::InstallFullSnapshot(first_sender), Respond::InstallFullSnapshot(second_sender)) => { + first_sender.eq(second_sender) + } + (Respond::InstallCompleteSnapshot(first_sender), Respond::InstallCompleteSnapshot(second_sender)) => { + first_sender.eq(second_sender) + } + (Respond::Initialize(first_sender), Respond::Initialize(second_sender)) => first_sender.eq(second_sender), _unused => false, } } From c76b48b0ffb2ea2b436408fd381e324f3eedd4cb Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Wed, 28 Feb 2024 17:23:58 +0100 Subject: [PATCH 03/11] Refactor: Change type definition to pass a 'RaftTypeConfig' instead --- openraft/src/core/raft_core.rs | 24 ++++---------- .../src/core/raft_msg/external_command.rs | 5 +-- openraft/src/core/raft_msg/mod.rs | 31 +++++++++---------- openraft/src/core/sm/command.rs | 10 +++--- openraft/src/core/sm/mod.rs | 6 +--- openraft/src/engine/engine_impl.rs | 11 +++---- .../src/engine/handler/vote_handler/mod.rs | 5 ++- openraft/src/replication/mod.rs | 2 +- openraft/src/replication/request.rs | 13 ++------ 9 files changed, 37 insertions(+), 70 deletions(-) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 629126cac..921a08fb7 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -438,7 +438,7 @@ where &mut self, changes: ChangeMembers, retain: bool, - tx: ResultSender, ClientWriteResponse, ClientWriteError>, + tx: ResultSender, ClientWriteError>, ) { let res = self.engine.state.membership_state.change_handler().apply(changes, retain); let new_membership = match res { @@ -599,7 +599,7 @@ where pub(crate) fn handle_initialize( &mut self, member_nodes: BTreeMap, - tx: ResultSender, (), InitializeError>, + tx: ResultSender>, ) { tracing::debug!(member_nodes = debug(&member_nodes), "{}", func_name!()); @@ -622,12 +622,8 @@ where /// Reject a request due to the Raft node being in a state which prohibits the request. #[tracing::instrument(level = "trace", skip(self, tx))] - pub(crate) fn reject_with_forward_to_leader( - &self, - tx: ResultSender, T, E>, - ) where - E: From>, - { + pub(crate) fn reject_with_forward_to_leader(&self, tx: ResultSender) + where E: From> { let mut leader_id = self.current_leader(); let leader_node = self.get_leader_node(leader_id); @@ -1080,11 +1076,7 @@ where } #[tracing::instrument(level = "debug", skip_all)] - pub(super) fn handle_vote_request( - &mut self, - req: VoteRequest, - tx: VoteTx, C::NodeId>, - ) { + pub(super) fn handle_vote_request(&mut self, req: VoteRequest, tx: VoteTx) { tracing::info!(req = display(req.summary()), func = func_name!()); let resp = self.engine.handle_vote_req(req); @@ -1095,11 +1087,7 @@ where } #[tracing::instrument(level = "debug", skip_all)] - pub(super) fn handle_append_entries_request( - &mut self, - req: AppendEntriesRequest, - tx: AppendEntriesTx, C::NodeId>, - ) { + pub(super) fn handle_append_entries_request(&mut self, req: AppendEntriesRequest, tx: AppendEntriesTx) { tracing::debug!(req = display(req.summary()), func = func_name!()); let is_ok = self.engine.handle_append_entries(&req.vote, req.prev_log_id, req.entries, Some(tx)); diff --git a/openraft/src/core/raft_msg/external_command.rs b/openraft/src/core/raft_msg/external_command.rs index 666d5bfd2..5714df39c 100644 --- a/openraft/src/core/raft_msg/external_command.rs +++ b/openraft/src/core/raft_msg/external_command.rs @@ -3,7 +3,6 @@ use std::fmt; use crate::core::raft_msg::ResultSender; -use crate::type_config::alias::AsyncRuntimeOf; use crate::RaftTypeConfig; use crate::Snapshot; @@ -24,9 +23,7 @@ pub(crate) enum ExternalCommand { Snapshot, /// Get a snapshot from the state machine, send back via a oneshot::Sender. - GetSnapshot { - tx: ResultSender, Option>>, - }, + GetSnapshot { tx: ResultSender>> }, /// Purge logs covered by a snapshot up to a specified index. /// diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index 39871d134..8c0f2a6df 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -28,26 +28,23 @@ use crate::Vote; pub(crate) mod external_command; /// A oneshot TX to send result from `RaftCore` to external caller, e.g. `Raft::append_entries`. -pub(crate) type ResultSender = ::OneshotSender>; +pub(crate) type ResultSender = as AsyncRuntime>::OneshotSender>; -pub(crate) type ResultReceiver = ::OneshotReceiver>; +pub(crate) type ResultReceiver = + as AsyncRuntime>::OneshotReceiver>; /// TX for Vote Response -pub(crate) type VoteTx = ResultSender>; +pub(crate) type VoteTx = ResultSender>>; /// TX for Append Entries Response -pub(crate) type AppendEntriesTx = ResultSender>; +pub(crate) type AppendEntriesTx = ResultSender>>; /// TX for Client Write Response -pub(crate) type ClientWriteTx = - ResultSender, ClientWriteResponse, ClientWriteError, NodeOf>>; +pub(crate) type ClientWriteTx = ResultSender, ClientWriteError, NodeOf>>; /// TX for Linearizable Read Response -pub(crate) type ClientReadTx = ResultSender< - AsyncRuntimeOf, - (Option>, Option>), - CheckIsLeaderError, NodeOf>, ->; +pub(crate) type ClientReadTx = + ResultSender>, Option>), CheckIsLeaderError, NodeOf>>; /// A message sent by application to the [`RaftCore`]. /// @@ -57,18 +54,18 @@ where C: RaftTypeConfig { AppendEntries { rpc: AppendEntriesRequest, - tx: AppendEntriesTx, C::NodeId>, + tx: AppendEntriesTx, }, RequestVote { rpc: VoteRequest, - tx: VoteTx, C::NodeId>, + tx: VoteTx, }, InstallFullSnapshot { vote: Vote, snapshot: Snapshot, - tx: ResultSender, SnapshotResponse>, + tx: ResultSender>, }, /// Begin receiving a snapshot from the leader. @@ -78,7 +75,7 @@ where C: RaftTypeConfig /// will be returned in a Err BeginReceivingSnapshot { vote: Vote, - tx: ResultSender, Box>, HigherVote>, + tx: ResultSender>, HigherVote>, }, ClientWriteRequest { @@ -92,7 +89,7 @@ where C: RaftTypeConfig Initialize { members: BTreeMap, - tx: ResultSender, (), InitializeError>, + tx: ResultSender>, }, ChangeMembership { @@ -102,7 +99,7 @@ where C: RaftTypeConfig /// config will be converted into learners, otherwise they will be removed. retain: bool, - tx: ResultSender, ClientWriteResponse, ClientWriteError>, + tx: ResultSender, ClientWriteError>, }, ExternalCoreRequest { diff --git a/openraft/src/core/sm/command.rs b/openraft/src/core/sm/command.rs index 5930322cf..11ddcf8bc 100644 --- a/openraft/src/core/sm/command.rs +++ b/openraft/src/core/sm/command.rs @@ -54,14 +54,12 @@ where C: RaftTypeConfig Command::new(payload) } - pub(crate) fn get_snapshot(tx: ResultSender>>) -> Self { + pub(crate) fn get_snapshot(tx: ResultSender>>) -> Self { let payload = CommandPayload::GetSnapshot { tx }; Command::new(payload) } - pub(crate) fn begin_receiving_snapshot( - tx: ResultSender>, HigherVote>, - ) -> Self { + pub(crate) fn begin_receiving_snapshot(tx: ResultSender>, HigherVote>) -> Self { let payload = CommandPayload::BeginReceivingSnapshot { tx }; Command::new(payload) } @@ -93,11 +91,11 @@ where C: RaftTypeConfig /// Get the latest built snapshot. GetSnapshot { - tx: ResultSender>>, + tx: ResultSender>>, }, BeginReceivingSnapshot { - tx: ResultSender>, HigherVote>, + tx: ResultSender>, HigherVote>, }, InstallFullSnapshot { diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index 78692202b..af810f6c0 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -12,7 +12,6 @@ use crate::core::ApplyingEntry; use crate::entry::RaftPayload; use crate::storage::RaftStateMachine; use crate::summary::MessageSummary; -use crate::type_config::alias::AsyncRuntimeOf; use crate::AsyncRuntime; use crate::RaftLogId; use crate::RaftSnapshotBuilder; @@ -221,10 +220,7 @@ where } #[tracing::instrument(level = "info", skip_all)] - async fn get_snapshot( - &mut self, - tx: ResultSender, Option>>, - ) -> Result<(), StorageError> { + async fn get_snapshot(&mut self, tx: ResultSender>>) -> Result<(), StorageError> { tracing::info!("{}", func_name!()); let snapshot = self.state_machine.get_current_snapshot().await?; diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index e79f26993..2cf2074b1 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -39,7 +39,6 @@ use crate::raft::VoteResponse; use crate::raft_state::LogStateReader; use crate::raft_state::RaftState; use crate::summary::MessageSummary; -use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::SnapshotDataOf; use crate::AsyncRuntime; use crate::Instant; @@ -225,8 +224,8 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn get_leader_handler_or_reject( &mut self, - tx: Option>, - ) -> Option<(LeaderHandler, Option>)> + tx: Option>, + ) -> Option<(LeaderHandler, Option>)> where T: OptionalSend, E: OptionalSend, @@ -396,7 +395,7 @@ where C: RaftTypeConfig vote: &Vote, prev_log_id: Option>, entries: Vec, - tx: Option, C::NodeId>>, + tx: Option>, ) -> bool { tracing::debug!( vote = display(vote), @@ -459,7 +458,7 @@ where C: RaftTypeConfig &mut self, vote: Vote, snapshot: Snapshot, - tx: ResultSender>, + tx: ResultSender>, ) { tracing::info!(vote = display(vote), snapshot = display(&snapshot), "{}", func_name!()); @@ -492,7 +491,7 @@ where C: RaftTypeConfig pub(crate) fn handle_begin_receiving_snapshot( &mut self, vote: Vote, - tx: ResultSender>, HigherVote>, + tx: ResultSender>, HigherVote>, ) { tracing::info!(vote = display(vote), "{}", func_name!()); diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 8a3ffe87d..b43dca8a8 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -11,7 +11,6 @@ use crate::error::RejectVoteRequest; use crate::internal_server_state::InternalServerState; use crate::leader::Leading; use crate::raft_state::LogStateReader; -use crate::type_config::alias::AsyncRuntimeOf; use crate::utime::UTime; use crate::AsyncRuntime; use crate::Instant; @@ -52,9 +51,9 @@ where C: RaftTypeConfig pub(crate) fn accept_vote( &mut self, vote: &Vote, - tx: ResultSender, T, E>, + tx: ResultSender, f: F, - ) -> Option, T, E>> + ) -> Option> where T: Debug + Eq + OptionalSend, E: Debug + Eq + OptionalSend, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index a02e00b33..d716b5c93 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -733,7 +733,7 @@ where #[tracing::instrument(level = "info", skip_all)] async fn stream_snapshot( &mut self, - snapshot_rx: DataWithId, Option>>>, + snapshot_rx: DataWithId>>>, ) -> Result>, ReplicationError> { let request_id = snapshot_rx.request_id(); let rx = snapshot_rx.into_data(); diff --git a/openraft/src/replication/request.rs b/openraft/src/replication/request.rs index c145627a9..ee2ad6d07 100644 --- a/openraft/src/replication/request.rs +++ b/openraft/src/replication/request.rs @@ -22,10 +22,7 @@ where C: RaftTypeConfig Self::Data(Data::new_logs(id, log_id_range)) } - pub(crate) fn snapshot( - id: Option, - snapshot_rx: ResultReceiver, Option>>, - ) -> Self { + pub(crate) fn snapshot(id: Option, snapshot_rx: ResultReceiver>>) -> Self { Self::Data(Data::new_snapshot(id, snapshot_rx)) } @@ -59,7 +56,6 @@ use crate::error::StreamingError; use crate::log_id_range::LogIdRange; use crate::raft::SnapshotResponse; use crate::replication::callbacks::SnapshotCallback; -use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::InstantOf; use crate::LogId; use crate::MessageSummary; @@ -77,7 +73,7 @@ where C: RaftTypeConfig { Heartbeat, Logs(DataWithId>), - Snapshot(DataWithId, Option>>>), + Snapshot(DataWithId>>>), SnapshotCallback(DataWithId>), } @@ -152,10 +148,7 @@ where C: RaftTypeConfig Self::Logs(DataWithId::new(request_id, log_id_range)) } - pub(crate) fn new_snapshot( - request_id: Option, - snapshot_rx: ResultReceiver, Option>>, - ) -> Self { + pub(crate) fn new_snapshot(request_id: Option, snapshot_rx: ResultReceiver>>) -> Self { Self::Snapshot(DataWithId::new(request_id, snapshot_rx)) } From 4ca38df69281b82cba2d931b070100acc026d2a2 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Thu, 29 Feb 2024 13:57:12 +0100 Subject: [PATCH 04/11] Refactor: Change type definition to pass a 'RaftTypeConfig' instead --- openraft/src/storage/adapter.rs | 11 ++--------- openraft/src/storage/callback.rs | 21 ++++++++------------- openraft/src/storage/v2.rs | 7 +------ 3 files changed, 11 insertions(+), 28 deletions(-) diff --git a/openraft/src/storage/adapter.rs b/openraft/src/storage/adapter.rs index 39cc9c171..fd6526e77 100644 --- a/openraft/src/storage/adapter.rs +++ b/openraft/src/storage/adapter.rs @@ -12,7 +12,6 @@ use crate::storage::v2::sealed::Sealed; use crate::storage::LogFlushed; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; -use crate::type_config::alias::AsyncRuntimeOf; use crate::LogId; use crate::LogState; use crate::OptionalSend; @@ -148,14 +147,8 @@ where S::get_log_reader(self.storage_mut().await.deref_mut()).await } - async fn append( - &mut self, - entries: I, - callback: LogFlushed, C::NodeId>, - ) -> Result<(), StorageError> - where - I: IntoIterator + OptionalSend, - { + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator + OptionalSend { // Default implementation that calls the flush-before-return `append_to_log`. S::append_to_log(self.storage_mut().await.deref_mut(), entries).await?; diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index 80432b7e1..d5549f6d4 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -8,28 +8,23 @@ use crate::async_runtime::AsyncOneshotSendExt; use crate::display_ext::DisplayOption; use crate::AsyncRuntime; use crate::LogId; -use crate::NodeId; use crate::RaftTypeConfig; use crate::StorageIOError; /// A oneshot callback for completion of log io operation. -pub struct LogFlushed -where - NID: NodeId, - Runtime: AsyncRuntime, +pub struct LogFlushed +where C: RaftTypeConfig { - last_log_id: Option>, - tx: Runtime::OneshotSender>, io::Error>>, + last_log_id: Option>, + tx: ::OneshotSender>, io::Error>>, } -impl LogFlushed -where - NID: NodeId, - Runtime: AsyncRuntime, +impl LogFlushed +where C: RaftTypeConfig { pub(crate) fn new( - last_log_id: Option>, - tx: Runtime::OneshotSender>, io::Error>>, + last_log_id: Option>, + tx: ::OneshotSender>, io::Error>>, ) -> Self { Self { last_log_id, tx } } diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index cd074d1e3..2c28b7f2a 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -6,7 +6,6 @@ use macros::add_async_trait; use crate::storage::callback::LogFlushed; use crate::storage::v2::sealed::Sealed; -use crate::type_config::alias::AsyncRuntimeOf; use crate::LogId; use crate::LogState; use crate::OptionalSend; @@ -121,11 +120,7 @@ where C: RaftTypeConfig /// /// - There must not be a **hole** in logs. Because Raft only examine the last log id to ensure /// correctness. - async fn append( - &mut self, - entries: I, - callback: LogFlushed, C::NodeId>, - ) -> Result<(), StorageError> + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> where I: IntoIterator + OptionalSend, I::IntoIter: OptionalSend; From 6fb723bda4f42ff391100b466246e075afe94dd2 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Thu, 29 Feb 2024 14:45:49 +0100 Subject: [PATCH 05/11] Refactor: simplify types for Respond --- openraft/src/engine/command.rs | 50 ++++++++----------- .../handler/vote_handler/accept_vote_test.rs | 8 +-- .../src/engine/handler/vote_handler/mod.rs | 2 +- openraft/src/type_config.rs | 3 ++ 4 files changed, 30 insertions(+), 33 deletions(-) diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 0235c89aa..e48c429c4 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -13,10 +13,10 @@ use crate::raft::InstallSnapshotResponse; use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; +use crate::type_config::alias::OneshotSenderOf; use crate::AsyncRuntime; use crate::LeaderId; use crate::LogId; -use crate::Node; use crate::NodeId; use crate::OptionalSend; use crate::RaftTypeConfig; @@ -99,7 +99,7 @@ where C: RaftTypeConfig /// Send result to caller Respond { when: Option>, - resp: Respond, + resp: Respond, }, } @@ -221,25 +221,22 @@ where NID: NodeId /// A command to send return value to the caller via a `oneshot::Sender`. #[derive(Debug)] #[derive(derive_more::From)] -pub(crate) enum Respond -where - NID: NodeId, - N: Node, - R: AsyncRuntime, +pub(crate) enum Respond +where C: RaftTypeConfig { - Vote(ValueSender, Infallible>>), - AppendEntries(ValueSender, Infallible>>), - ReceiveSnapshotChunk(ValueSender>), - InstallSnapshot(ValueSender, InstallSnapshotError>>), - InstallFullSnapshot(ValueSender, Infallible>>), - Initialize(ValueSender>>), + Vote(ValueSender, Infallible>>), + AppendEntries(ValueSender, Infallible>>), + ReceiveSnapshotChunk(ValueSender>), + InstallSnapshot(ValueSender, InstallSnapshotError>>), + InstallFullSnapshot(ValueSender, Infallible>>), + Initialize(ValueSender>>), } -impl PartialEq for Respond +impl PartialEq for Respond where - NID: NodeId + PartialEq, - N: Node + PartialEq, - R: AsyncRuntime, + C: RaftTypeConfig, + C::NodeId: PartialEq, + C::Node: PartialEq, { fn eq(&self, other: &Self) -> bool { match (self, other) { @@ -262,24 +259,21 @@ where } } -impl Eq for Respond +impl Eq for Respond where - NID: NodeId + Eq, - N: Node + Eq, - R: AsyncRuntime, + C: RaftTypeConfig, + C::NodeId: Eq, + C::Node: Eq, { } -impl Respond -where - NID: NodeId, - N: Node, - R: AsyncRuntime, +impl Respond +where C: RaftTypeConfig { - pub(crate) fn new(res: T, tx: R::OneshotSender) -> Self + pub(crate) fn new(res: T, tx: OneshotSenderOf) -> Self where T: Debug + PartialEq + Eq + OptionalSend, - Self: From>, + Self: From>, { Respond::from(ValueSender::new(res, tx)) } diff --git a/openraft/src/engine/handler/vote_handler/accept_vote_test.rs b/openraft/src/engine/handler/vote_handler/accept_vote_test.rs index f7bf9c5d2..7de499e7a 100644 --- a/openraft/src/engine/handler/vote_handler/accept_vote_test.rs +++ b/openraft/src/engine/handler/vote_handler/accept_vote_test.rs @@ -11,11 +11,11 @@ use crate::engine::Respond; use crate::error::Infallible; use crate::raft::VoteResponse; use crate::testing::log_id; +use crate::type_config::alias::AsyncRuntimeOf; use crate::utime::UTime; use crate::AsyncRuntime; use crate::EffectiveMembership; use crate::Membership; -use crate::RaftTypeConfig; use crate::TokioInstant; use crate::Vote; @@ -52,12 +52,12 @@ fn test_accept_vote_reject_smaller_vote() -> anyhow::Result<()> { // When a vote is reject, it generate SendResultCommand and return an error. let mut eng = eng(); - let (tx, _rx) = <::AsyncRuntime as AsyncRuntime>::oneshot(); + let (tx, _rx) = AsyncRuntimeOf::::oneshot(); let resp = eng.vote_handler().accept_vote(&Vote::new(1, 2), tx, |_state, _err| mk_res()); assert!(resp.is_none()); - let (tx, _rx) = <::AsyncRuntime as AsyncRuntime>::oneshot(); + let (tx, _rx) = AsyncRuntimeOf::::oneshot(); assert_eq!( vec![ // @@ -77,7 +77,7 @@ fn test_accept_vote_granted_greater_vote() -> anyhow::Result<()> { // When a vote is accepted, it generate SaveVote command and return an Ok. let mut eng = eng(); - let (tx, _rx) = <::AsyncRuntime as AsyncRuntime>::oneshot(); + let (tx, _rx) = AsyncRuntimeOf::::oneshot(); let resp = eng.vote_handler().accept_vote(&Vote::new(3, 3), tx, |_state, _err| mk_res()); assert!(resp.is_some()); diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index b43dca8a8..c8ec894aa 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -57,7 +57,7 @@ where C: RaftTypeConfig where T: Debug + Eq + OptionalSend, E: Debug + Eq + OptionalSend, - Respond: From>>, + Respond: From>>, F: Fn( &RaftState::Instant>, RejectVoteRequest, diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 023c37cad..9fa647d1c 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -91,6 +91,9 @@ pub(crate) mod alias { pub(crate) type InstantOf = as crate::AsyncRuntime>::Instant; pub(crate) type TimeoutErrorOf = as crate::AsyncRuntime>::TimeoutError; pub(crate) type TimeoutOf = as crate::AsyncRuntime>::Timeout; + pub(crate) type OneshotSenderOf = as crate::AsyncRuntime>::OneshotSender; + pub(crate) type OneshotReceiverErrorOf = as crate::AsyncRuntime>::OneshotReceiverError; + pub(crate) type OneshotReceiverOf = as crate::AsyncRuntime>::OneshotReceiver; // Usually used types pub(crate) type LogIdOf = crate::LogId>; From aa97ec038ee49d9157314e0ec71834af3e3c4b84 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Thu, 29 Feb 2024 14:50:58 +0100 Subject: [PATCH 06/11] Refactor: Use OneshotSenderOf --- openraft/src/core/raft_msg/mod.rs | 3 ++- openraft/src/raft/raft_inner.rs | 3 ++- openraft/src/storage/callback.rs | 6 +++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index 8c0f2a6df..f4ea72cdc 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -17,6 +17,7 @@ use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::LogIdOf; use crate::type_config::alias::NodeIdOf; use crate::type_config::alias::NodeOf; +use crate::type_config::alias::OneshotSenderOf; use crate::type_config::alias::SnapshotDataOf; use crate::AsyncRuntime; use crate::ChangeMembers; @@ -28,7 +29,7 @@ use crate::Vote; pub(crate) mod external_command; /// A oneshot TX to send result from `RaftCore` to external caller, e.g. `Raft::append_entries`. -pub(crate) type ResultSender = as AsyncRuntime>::OneshotSender>; +pub(crate) type ResultSender = OneshotSenderOf>; pub(crate) type ResultReceiver = as AsyncRuntime>::OneshotReceiver>; diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index bbc3d3a27..d13fdfeec 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -16,6 +16,7 @@ use crate::error::RaftError; use crate::metrics::RaftDataMetrics; use crate::metrics::RaftServerMetrics; use crate::raft::core_state::CoreState; +use crate::type_config::alias::OneshotSenderOf; use crate::AsyncRuntime; use crate::Config; use crate::MessageSummary; @@ -39,7 +40,7 @@ where C: RaftTypeConfig // TODO(xp): it does not need to be a async mutex. #[allow(clippy::type_complexity)] - pub(in crate::raft) tx_shutdown: Mutex::OneshotSender<()>>>, + pub(in crate::raft) tx_shutdown: Mutex>>, pub(in crate::raft) core_state: Mutex>, /// The ongoing snapshot transmission. diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index d5549f6d4..b4f666ae2 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -6,7 +6,7 @@ use tokio::sync::oneshot; use crate::async_runtime::AsyncOneshotSendExt; use crate::display_ext::DisplayOption; -use crate::AsyncRuntime; +use crate::type_config::alias::OneshotSenderOf; use crate::LogId; use crate::RaftTypeConfig; use crate::StorageIOError; @@ -16,7 +16,7 @@ pub struct LogFlushed where C: RaftTypeConfig { last_log_id: Option>, - tx: ::OneshotSender>, io::Error>>, + tx: OneshotSenderOf>, io::Error>>, } impl LogFlushed @@ -24,7 +24,7 @@ where C: RaftTypeConfig { pub(crate) fn new( last_log_id: Option>, - tx: ::OneshotSender>, io::Error>>, + tx: OneshotSenderOf>, io::Error>>, ) -> Self { Self { last_log_id, tx } } From a1076fbed45acc81308677d17cd684a8720f8aa8 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Thu, 29 Feb 2024 14:56:45 +0100 Subject: [PATCH 07/11] Fix: little issue while rewording a commit --- openraft/src/engine/command.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index e48c429c4..4b463a44f 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -247,10 +247,10 @@ where (Respond::ReceiveSnapshotChunk(first_sender), Respond::ReceiveSnapshotChunk(second_sender)) => { first_sender.eq(second_sender) } - (Respond::InstallFullSnapshot(first_sender), Respond::InstallFullSnapshot(second_sender)) => { + (Respond::InstallSnapshot(first_sender), Respond::InstallSnapshot(second_sender)) => { first_sender.eq(second_sender) } - (Respond::InstallCompleteSnapshot(first_sender), Respond::InstallCompleteSnapshot(second_sender)) => { + (Respond::InstallFullSnapshot(first_sender), Respond::InstallFullSnapshot(second_sender)) => { first_sender.eq(second_sender) } (Respond::Initialize(first_sender), Respond::Initialize(second_sender)) => first_sender.eq(second_sender), From 6e2daba3c35e4f70f444a1c8dabf01be8875c8fb Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Thu, 29 Feb 2024 15:03:19 +0100 Subject: [PATCH 08/11] Refactor: LogPush with RaftTypeConfig type parameters --- cluster_benchmark/tests/benchmark/store.rs | 2 +- examples/memstore/src/log_store.rs | 4 ++-- examples/raft-kv-memstore-singlethreaded/src/store.rs | 2 +- examples/raft-kv-rocksdb/src/store.rs | 2 +- stores/rocksstore-v2/src/lib.rs | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index ea83a5772..2b3b9bf73 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -227,7 +227,7 @@ impl RaftLogStorage for Arc { async fn append( &mut self, entries: I, - callback: LogFlushed<::AsyncRuntime, NodeId>, + callback: LogFlushed, ) -> Result<(), StorageError> where I: IntoIterator> + Send, diff --git a/examples/memstore/src/log_store.rs b/examples/memstore/src/log_store.rs index deeb8ef5a..77c8a9e41 100644 --- a/examples/memstore/src/log_store.rs +++ b/examples/memstore/src/log_store.rs @@ -96,7 +96,7 @@ impl LogStoreInner { async fn append( &mut self, entries: I, - callback: LogFlushed, + callback: LogFlushed, ) -> Result<(), StorageError> where I: IntoIterator, @@ -197,7 +197,7 @@ mod impl_log_store { async fn append( &mut self, entries: I, - callback: LogFlushed, + callback: LogFlushed, ) -> Result<(), StorageError> where I: IntoIterator, diff --git a/examples/raft-kv-memstore-singlethreaded/src/store.rs b/examples/raft-kv-memstore-singlethreaded/src/store.rs index f0dd1a054..b6cad2d63 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/store.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/store.rs @@ -324,7 +324,7 @@ impl RaftLogStorage for Rc { async fn append( &mut self, entries: I, - callback: LogFlushed<::AsyncRuntime, NodeId>, + callback: LogFlushed, ) -> Result<(), StorageError> where I: IntoIterator>, diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index 1d09d7eeb..b5bf184ac 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -440,7 +440,7 @@ impl RaftLogStorage for LogStore { async fn append( &mut self, entries: I, - callback: LogFlushed<::AsyncRuntime, NodeId>, + callback: LogFlushed, ) -> StorageResult<()> where I: IntoIterator> + Send, diff --git a/stores/rocksstore-v2/src/lib.rs b/stores/rocksstore-v2/src/lib.rs index d0011353b..4428ac1b0 100644 --- a/stores/rocksstore-v2/src/lib.rs +++ b/stores/rocksstore-v2/src/lib.rs @@ -353,7 +353,7 @@ impl RaftLogStorage for RocksLogStore { async fn append( &mut self, entries: I, - callback: LogFlushed<::AsyncRuntime, RocksNodeId>, + callback: LogFlushed, ) -> Result<(), StorageError> where I: IntoIterator> + Send, From 6183dcc3fdfb11adc6c7d5b3a780263e91dc85c1 Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Thu, 29 Feb 2024 15:13:36 +0100 Subject: [PATCH 09/11] Chore: lint & fmt --- examples/memstore/src/log_store.rs | 20 ++++--------------- .../src/store.rs | 10 ++-------- examples/raft-kv-rocksdb/src/store.rs | 7 +------ 3 files changed, 7 insertions(+), 30 deletions(-) diff --git a/examples/memstore/src/log_store.rs b/examples/memstore/src/log_store.rs index 77c8a9e41..867465ac6 100644 --- a/examples/memstore/src/log_store.rs +++ b/examples/memstore/src/log_store.rs @@ -93,14 +93,8 @@ impl LogStoreInner { Ok(self.vote) } - async fn append( - &mut self, - entries: I, - callback: LogFlushed, - ) -> Result<(), StorageError> - where - I: IntoIterator, - { + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator { // Simple implementation that calls the flush-before-return `append_to_log`. for entry in entries { self.log.insert(entry.get_log_id().index, entry); @@ -194,14 +188,8 @@ mod impl_log_store { inner.read_vote().await } - async fn append( - &mut self, - entries: I, - callback: LogFlushed, - ) -> Result<(), StorageError> - where - I: IntoIterator, - { + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator { let mut inner = self.inner.lock().await; inner.append(entries, callback).await } diff --git a/examples/raft-kv-memstore-singlethreaded/src/store.rs b/examples/raft-kv-memstore-singlethreaded/src/store.rs index b6cad2d63..227fba770 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/store.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/store.rs @@ -321,14 +321,8 @@ impl RaftLogStorage for Rc { } #[tracing::instrument(level = "trace", skip(self, entries, callback))] - async fn append( - &mut self, - entries: I, - callback: LogFlushed, - ) -> Result<(), StorageError> - where - I: IntoIterator>, - { + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator> { // Simple implementation that calls the flush-before-return `append_to_log`. let mut log = self.log.borrow_mut(); for entry in entries { diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index b5bf184ac..4c1bac8b9 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -22,7 +22,6 @@ use openraft::LogId; use openraft::OptionalSend; use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; -use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; @@ -437,11 +436,7 @@ impl RaftLogStorage for LogStore { } #[tracing::instrument(level = "trace", skip_all)] - async fn append( - &mut self, - entries: I, - callback: LogFlushed, - ) -> StorageResult<()> + async fn append(&mut self, entries: I, callback: LogFlushed) -> StorageResult<()> where I: IntoIterator> + Send, I::IntoIter: Send, From 1019be987274e05a6276fc1b674bedc7fd219f7a Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Thu, 29 Feb 2024 15:19:58 +0100 Subject: [PATCH 10/11] Refactor: ValueSender use RaftTypeConfig instead of just AsyncRuntime --- openraft/src/engine/command.rs | 35 +++++++++---------- .../src/engine/handler/vote_handler/mod.rs | 8 ++--- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 4b463a44f..330f202e4 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -14,7 +14,6 @@ use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::type_config::alias::OneshotSenderOf; -use crate::AsyncRuntime; use crate::LeaderId; use crate::LogId; use crate::NodeId; @@ -224,12 +223,12 @@ where NID: NodeId pub(crate) enum Respond where C: RaftTypeConfig { - Vote(ValueSender, Infallible>>), - AppendEntries(ValueSender, Infallible>>), - ReceiveSnapshotChunk(ValueSender>), - InstallSnapshot(ValueSender, InstallSnapshotError>>), - InstallFullSnapshot(ValueSender, Infallible>>), - Initialize(ValueSender>>), + Vote(ValueSender, Infallible>>), + AppendEntries(ValueSender, Infallible>>), + ReceiveSnapshotChunk(ValueSender>), + InstallSnapshot(ValueSender, InstallSnapshotError>>), + InstallFullSnapshot(ValueSender, Infallible>>), + Initialize(ValueSender>>), } impl PartialEq for Respond @@ -273,7 +272,7 @@ where C: RaftTypeConfig pub(crate) fn new(res: T, tx: OneshotSenderOf) -> Self where T: Debug + PartialEq + Eq + OptionalSend, - Self: From>, + Self: From>, { Respond::from(ValueSender::new(res, tx)) } @@ -291,38 +290,38 @@ where C: RaftTypeConfig } #[derive(Debug)] -pub(crate) struct ValueSender +pub(crate) struct ValueSender where T: Debug + PartialEq + Eq + OptionalSend, - R: AsyncRuntime, + C: RaftTypeConfig, { value: T, - tx: R::OneshotSender, + tx: OneshotSenderOf, } -impl PartialEq for ValueSender +impl PartialEq for ValueSender where T: Debug + PartialEq + Eq + OptionalSend, - R: AsyncRuntime, + C: RaftTypeConfig, { fn eq(&self, other: &Self) -> bool { self.value == other.value } } -impl Eq for ValueSender +impl Eq for ValueSender where T: Debug + PartialEq + Eq + OptionalSend, - R: AsyncRuntime, + C: RaftTypeConfig, { } -impl ValueSender +impl ValueSender where T: Debug + PartialEq + Eq + OptionalSend, - R: AsyncRuntime, + C: RaftTypeConfig, { - pub(crate) fn new(res: T, tx: R::OneshotSender) -> Self { + pub(crate) fn new(res: T, tx: OneshotSenderOf) -> Self { Self { value: res, tx } } diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index c8ec894aa..92f4eb647 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -11,6 +11,7 @@ use crate::error::RejectVoteRequest; use crate::internal_server_state::InternalServerState; use crate::leader::Leading; use crate::raft_state::LogStateReader; +use crate::type_config::alias::InstantOf; use crate::utime::UTime; use crate::AsyncRuntime; use crate::Instant; @@ -57,11 +58,8 @@ where C: RaftTypeConfig where T: Debug + Eq + OptionalSend, E: Debug + Eq + OptionalSend, - Respond: From>>, - F: Fn( - &RaftState::Instant>, - RejectVoteRequest, - ) -> Result, + Respond: From>>, + F: Fn(&RaftState>, RejectVoteRequest) -> Result, { let vote_res = self.update_vote(vote); From bcb482676fd3b047c79bec0505cab98db0e2cf3b Mon Sep 17 00:00:00 2001 From: Anthony Griffon Date: Thu, 29 Feb 2024 15:32:53 +0100 Subject: [PATCH 11/11] Refactor: Enforce 'AsyncRuntime' to be 'PartialEq' + 'Eq' and remove manual impl of 'PartialEq' and 'Eq' --- openraft/src/async_runtime.rs | 4 ++-- openraft/src/engine/command.rs | 37 +--------------------------------- 2 files changed, 3 insertions(+), 38 deletions(-) diff --git a/openraft/src/async_runtime.rs b/openraft/src/async_runtime.rs index a34627498..c602d48a7 100644 --- a/openraft/src/async_runtime.rs +++ b/openraft/src/async_runtime.rs @@ -18,7 +18,7 @@ use crate::TokioInstant; /// ## Note /// /// The default asynchronous runtime is `tokio`. -pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static { +pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + OptionalSync + 'static { /// The error type of [`Self::JoinHandle`]. type JoinError: Debug + Display + OptionalSend; @@ -97,7 +97,7 @@ pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static } /// `Tokio` is the default asynchronous executor. -#[derive(Debug, Default)] +#[derive(Debug, Default, PartialEq, Eq)] pub struct TokioRuntime; pub struct TokioOneShotSender(pub tokio::sync::oneshot::Sender); diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 330f202e4..6673ebc75 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -218,7 +218,7 @@ where NID: NodeId } /// A command to send return value to the caller via a `oneshot::Sender`. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] #[derive(derive_more::From)] pub(crate) enum Respond where C: RaftTypeConfig @@ -231,41 +231,6 @@ where C: RaftTypeConfig Initialize(ValueSender>>), } -impl PartialEq for Respond -where - C: RaftTypeConfig, - C::NodeId: PartialEq, - C::Node: PartialEq, -{ - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Respond::Vote(first_sender), Respond::Vote(second_sender)) => first_sender.eq(second_sender), - (Respond::AppendEntries(first_sender), Respond::AppendEntries(second_sender)) => { - first_sender.eq(second_sender) - } - (Respond::ReceiveSnapshotChunk(first_sender), Respond::ReceiveSnapshotChunk(second_sender)) => { - first_sender.eq(second_sender) - } - (Respond::InstallSnapshot(first_sender), Respond::InstallSnapshot(second_sender)) => { - first_sender.eq(second_sender) - } - (Respond::InstallFullSnapshot(first_sender), Respond::InstallFullSnapshot(second_sender)) => { - first_sender.eq(second_sender) - } - (Respond::Initialize(first_sender), Respond::Initialize(second_sender)) => first_sender.eq(second_sender), - _unused => false, - } - } -} - -impl Eq for Respond -where - C: RaftTypeConfig, - C::NodeId: Eq, - C::Node: Eq, -{ -} - impl Respond where C: RaftTypeConfig {