Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doc: update architecture: add heartbeat worker #1239

Merged
merged 2 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/raft-kv-memstore-network-v2/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ pub struct StoredSnapshot {
pub data: Box<typ::SnapshotData>,
}

/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the
/// Data contained in the Raft state machine.
///
/// Note that we are using `serde` to serialize the
/// `data`, which has a implementation to be serialized. Note that for this test we set both the key
/// and value as String, but you could set any type of value that has the serialization impl.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand Down
4 changes: 3 additions & 1 deletion examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ pub struct StoredSnapshot {
pub data: Box<typ::SnapshotData>,
}

/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the
/// Data contained in the Raft state machine.
///
/// Note that we are using `serde` to serialize the
/// `data`, which has a implementation to be serialized. Note that for this test we set both the key
/// and value as String, but you could set any type of value that has the serialization impl.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand Down
4 changes: 3 additions & 1 deletion examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ pub struct StoredSnapshot {
pub data: Vec<u8>,
}

/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the
/// Data contained in the Raft state machine.
///
/// Note that we are using `serde` to serialize the
/// `data`, which has a implementation to be serialized. Note that for this test we set both the key
/// and value as String, but you could set any type of value that has the serialization impl.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand Down
4 changes: 3 additions & 1 deletion examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ pub struct StoredSnapshot {
pub data: Vec<u8>,
}

/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the
/// Data contained in the Raft state machine.
///
/// Note that we are using `serde` to serialize the
/// `data`, which has a implementation to be serialized. Note that for this test we set both the key
/// and value as String, but you could set any type of value that has the serialization impl.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand Down
112 changes: 65 additions & 47 deletions openraft/src/docs/internal/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ The major components inside Openraft include:
to replicate logs or snapshots. It communicates with `RaftCore` through
channels.

- **`HeartbeatWorkersHandle`**: This control handle manages all heartbeat
tasks: `HeartbeatWorker`.

- **`HeartbeatWorker`**: This specialized task is solely responsible for
sending heartbeat messages (in the form of an empty
[`AppendEntriesRequest`]) to a specific target node. This task, separate
from the `ReplicationCore`, is responsible for sending heartbeats. This
design prevents intensive log entry replication from blocking heartbeat
acknowledgments, which could otherwise lead to the Leader's lease expiration
and subsequent step down.

- **[`RaftNetwork`]**: This is a user-provided component that implements the
network transport layer, e.g., sending logs to a remote node or sending a
[`VoteRequest`] to a remote node.
Expand All @@ -41,14 +52,15 @@ The major components inside Openraft include:



[`Raft`]: `crate::raft::Raft`
[`client_write`]: `crate::raft::Raft::client_write`
[`RaftLogStorage`]: `crate::storage::RaftLogStorage`
[`RaftStateMachine`]: `crate::storage::RaftStateMachine`
[`Adapter`]: `crate::storage::Adapter`
[`RaftNetwork`]: `crate::network::RaftNetwork`
[`append_entries`]: `crate::network::RaftNetwork::append_entries`
[`VoteRequest`]: `crate::raft::VoteRequest`
[`Raft`]: `crate::raft::Raft`
[`client_write`]: `crate::raft::Raft::client_write`
[`RaftLogStorage`]: `crate::storage::RaftLogStorage`
[`RaftStateMachine`]: `crate::storage::RaftStateMachine`
[`Adapter`]: `crate::storage::Adapter`
[`RaftNetwork`]: `crate::network::RaftNetwork`
[`append_entries`]: `crate::network::RaftNetwork::append_entries`
[`VoteRequest`]: `crate::raft::VoteRequest`
[`AppendEntriesRequest`]: `crate::raft::message::AppendEntriesRequest`

[//]: # (private)
[//]: # ([`RaftMsg`]: `crate::raft::RaftMsg`)
Expand All @@ -65,49 +77,55 @@ The major components inside Openraft include:
! User !
! o !
! | !
! | !
! | "client_write(impl AppData) -> impl AppDataResponse"
! | "is_leader()" !
! | "client_write(impl AppData)"
! | "ensure_linearizable()"
! | "change_membership()"!
! v !
! Raft ! .-----> Raft o---.
'~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~' | |
| | |
| enum RaftMes | |
| | |
.~~~~~~~|~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~. | .~~~~~~|~~~~~~~~~~~~.
! v ! | ! v !
.----------------o RaftCore ---------------------------. | ! RaftCore !
| ! o ! | | ! !
| ! | ! | | '~~~~~~~~~~~~~~~~~~~'
| ! .--+--------. ! | |
| ! v v ! | |
| ! ReplicationHandle ReplicationHandle ! | |
| ! | | ! | |
| '~~|~~~~~~~~~~~~~~~~|~~~~~~~~~~~~~~~~~~~' | |
| | | | |
| .~~~~~~~~~~|~~~~~~~~~~. .~~|~~~~~~~~~~~~~~~~~~~. | | RPC:
| ! v ! ! v ! | | "vote()"
| ! ReplicationCore ! ! ReplicationCore ! | | "append_entries()"
| ! o ! ! o o ! | | "install_snapshot()"
| '~|~~~~~~~~~~~~~~~~~~~' ! | | ! | |
| | ! | v ! | |
| | ! | RaftNetwork--------------------'
| | '~~|~~~~~~~~~~~~~~~~~~~' |
| | | | "apply()"
| `------------+------------' | "build_snapshot()"
| | "get_log()" .-----------' "install_snapshot()"
| "append_log()" | |
| "..." | .~~~~~~~~~~|~~~~~~~~~~.
`--------. | ! v !
| | ! RaftStateMachine !
| | ! o !
| | ! | !
| | '~~~~~~~~~~|~~~~~~~~~~'
v v |
RaftLogStorage |
! Raft ! .-----> Raft ----.
'~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~' | |
| | |
| enum RaftMes | |
| | |
.~~~~~~~|~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~. | .~~~~~~|~~~~~~~~~~~~.
"append()" ! v ! | ! v !
.------o RaftCore -------------------------------. | ! RaftCore !
| ! ReplicationHandle ! | | ! !
| ! | ReplicationHandle ! | | '~~~~~~~~~~~~~~~~~~~'
| ! | | HeartbeatWorkersHandle ! | |
| ! | | | ! | |
| '~~~|~~|~~~~~~~|~~~~~~~~~~~~~~~~~~~~~~~~' | |
| | | | | |
| | | | .~~~~~~~~~~~~~~~~~. | |
| | | +------>HeartbeatWorker ! | |
| | | | ! RaftNetwork---------------+
| | | | '~~~~~~~~~~~~~~~~~' | |
| | | | .~~~~~~~~~~~~~~~~~. | | RPC:
| | | `------>HeartbeatWorker ! | | "vote()"
| | | ! RaftNetwork ! | | "append_entries()"
| | | '~~~~~~~~~~~~~~~~~' | | "snapshot()"
| | | | | "transfer_leader()"
| | | .~~~~~~~~~~~~~~~~~. | |
| | `------>ReplicationCore ! | |
| .----|--------!-o RaftNetwork---------------------+
| | | '~~~~~~~~~~~~~~~~~' | |
| | | .~~~~~~~~~~~~~~~~~. | |
| | `--------->ReplicationCore ! | |
| +-------------!-o RaftNetwork---------------------'
| | '~~~~~~~~~~~~~~~~~' |
| | | "apply()"
| | | "build_snapshot()"
| | "try_get_log_entries()" .-----------' "install_snapshot()"
| | |
| | .~~~~~~~~~~|~~~~~~~~~~.
| | ! v !
| | ! RaftStateMachine !
| | ! o !
| | '~~~~~~~~~~|~~~~~~~~~~'
v v |
RaftLogStorage |
o |
| |
| |
v v
local-disk local-disk
----------------------------------------------- -----------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/error/streaming_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl<C: RaftTypeConfig> From<StreamingError<C, Fatal<C>>> for ReplicationError<C

impl<C: RaftTypeConfig> From<RPCError<C>> for StreamingError<C> {
fn from(value: RPCError<C>) -> Self {
#[allow(unreachable_patterns)]
match value {
RPCError::Timeout(e) => StreamingError::Timeout(e),
RPCError::Unreachable(e) => StreamingError::Unreachable(e),
Expand All @@ -80,6 +81,7 @@ impl<C: RaftTypeConfig> From<RPCError<C>> for StreamingError<C> {

impl<C: RaftTypeConfig> From<StreamingError<C>> for ReplicationError<C> {
fn from(e: StreamingError<C>) -> Self {
#[allow(unreachable_patterns)]
match e {
StreamingError::Closed(e) => ReplicationError::Closed(e),
StreamingError::StorageError(e) => ReplicationError::StorageError(e),
Expand Down
7 changes: 2 additions & 5 deletions openraft/src/raft/raft_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,8 @@ where C: RaftTypeConfig
message_summary.unwrap_or_default()
);

match core_res {
// A normal quit is still an unexpected "stop" to the caller.
Ok(_) => Fatal::Stopped,
Err(e) => e,
}
// Safe unwrap: core_res is always an error
core_res.unwrap_err()
}

/// Wait for `RaftCore` task to finish and record the returned value from the task.
Expand Down
Loading