diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e4e3c8a93..87dc5a8ae 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -404,6 +404,7 @@ jobs: - "stable" - "nightly" example: + - "memstore" - "raft-kv-memstore" - "raft-kv-memstore-generic-snapshot-data" - "raft-kv-memstore-singlethreaded" diff --git a/Cargo.toml b/Cargo.toml index 17dd84df9..45155ade1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,10 +52,12 @@ members = [ "tests", "rocksstore", "rocksstore-compat07", - "sledstore"] + "sledstore", +] exclude = [ "cluster_benchmark", "stores/rocksstore-v2", + "examples/memstore", "examples/raft-kv-memstore", "examples/raft-kv-memstore-singlethreaded", "examples/raft-kv-memstore-generic-snapshot-data", diff --git a/examples/memstore/Cargo.toml b/examples/memstore/Cargo.toml new file mode 100644 index 000000000..5cd7af192 --- /dev/null +++ b/examples/memstore/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "memstore" +version = "0.1.0" +readme = "README.md" + +edition = "2021" +authors = [ + "drdr xp ", +] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example in-memory storage for `openraft`." +homepage = "https://github.com/datafuselabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/datafuselabs/openraft" + +[dependencies] +openraft = { path = "../../openraft", features = ["serde", "storage-v2"] } + +tokio = { version = "1.0", default-features = false, features = ["sync"] } + +[features] + +[package.metadata.docs.rs] +all-features = true diff --git a/examples/memstore/README.md b/examples/memstore/README.md new file mode 100644 index 000000000..54f57b978 --- /dev/null +++ b/examples/memstore/README.md @@ -0,0 +1,8 @@ +# memstore + +This is a simple in-memory implementation of `RaftLogStore`. +Other example Raft based applications use this crate as a component to store raft logs. + +TODO: + +- [ ] Add `RaftStateMachine` \ No newline at end of file diff --git a/examples/memstore/src/lib.rs b/examples/memstore/src/lib.rs new file mode 100644 index 000000000..751ea0dbf --- /dev/null +++ b/examples/memstore/src/lib.rs @@ -0,0 +1,5 @@ +//! Provide storage layer implementation for examples. + +mod log_store; + +pub use log_store::LogStore; diff --git a/examples/memstore/src/log_store.rs b/examples/memstore/src/log_store.rs new file mode 100644 index 000000000..25715e781 --- /dev/null +++ b/examples/memstore/src/log_store.rs @@ -0,0 +1,217 @@ +//! Provide `LogStore`, which is a in-memory implementation of `RaftLogStore` for demostration +//! purpose only. + +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::ops::RangeBounds; +use std::sync::Arc; + +use openraft::storage::LogFlushed; +use openraft::LogId; +use openraft::LogState; +use openraft::RaftLogId; +use openraft::RaftTypeConfig; +use openraft::StorageError; +use openraft::Vote; +use tokio::sync::Mutex; + +/// RaftLogStore implementation with a in-memory storage +#[derive(Clone, Debug, Default)] +pub struct LogStore { + inner: Arc>>, +} + +#[derive(Debug)] +pub struct LogStoreInner { + /// The last purged log id. + last_purged_log_id: Option>, + + /// The Raft log. + log: BTreeMap, + + /// The commit log id. + committed: Option>, + + /// The current granted vote. + vote: Option>, +} + +impl Default for LogStoreInner { + fn default() -> Self { + Self { + last_purged_log_id: None, + log: BTreeMap::new(), + committed: None, + vote: None, + } + } +} + +impl LogStoreInner { + async fn try_get_log_entries + Clone + Debug>( + &mut self, + range: RB, + ) -> Result, StorageError> + where + C::Entry: Clone, + { + let response = self.log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); + Ok(response) + } + + async fn get_log_state(&mut self) -> Result, StorageError> { + let last = self.log.iter().next_back().map(|(_, ent)| *ent.get_log_id()); + + let last_purged = self.last_purged_log_id; + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + self.committed = committed; + Ok(()) + } + + async fn read_committed(&mut self) -> Result>, StorageError> { + Ok(self.committed) + } + + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + self.vote = Some(*vote); + Ok(()) + } + + async fn read_vote(&mut self) -> Result>, StorageError> { + Ok(self.vote) + } + + async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> + where I: IntoIterator { + // Simple implementation that calls the flush-before-return `append_to_log`. + for entry in entries { + self.log.insert(entry.get_log_id().index, entry); + } + callback.log_io_completed(Ok(())); + + Ok(()) + } + + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + let keys = self.log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); + for key in keys { + self.log.remove(&key); + } + + Ok(()) + } + + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + { + let ld = &mut self.last_purged_log_id; + assert!(*ld <= Some(log_id)); + *ld = Some(log_id); + } + + { + let keys = self.log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); + for key in keys { + self.log.remove(&key); + } + } + + Ok(()) + } +} + +mod impl_log_store { + use std::fmt::Debug; + use std::ops::RangeBounds; + + use openraft::storage::LogFlushed; + use openraft::storage::RaftLogStorage; + use openraft::LogId; + use openraft::LogState; + use openraft::RaftLogReader; + use openraft::RaftTypeConfig; + use openraft::StorageError; + use openraft::Vote; + + use crate::log_store::LogStore; + + impl RaftLogReader for LogStore + where C::Entry: Clone + { + async fn try_get_log_entries + Clone + Debug>( + &mut self, + range: RB, + ) -> Result, StorageError> { + let mut inner = self.inner.lock().await; + inner.try_get_log_entries(range).await + } + } + + impl RaftLogStorage for LogStore + where C::Entry: Clone + { + type LogReader = Self; + + async fn get_log_state(&mut self) -> Result, StorageError> { + let mut inner = self.inner.lock().await; + inner.get_log_state().await + } + + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + let mut inner = self.inner.lock().await; + inner.save_committed(committed).await + } + + async fn read_committed(&mut self) -> Result>, StorageError> { + let mut inner = self.inner.lock().await; + inner.read_committed().await + } + + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + let mut inner = self.inner.lock().await; + inner.save_vote(vote).await + } + + async fn read_vote(&mut self) -> Result>, StorageError> { + let mut inner = self.inner.lock().await; + inner.read_vote().await + } + + async fn append( + &mut self, + entries: I, + callback: LogFlushed, + ) -> Result<(), StorageError> + where + I: IntoIterator, + { + let mut inner = self.inner.lock().await; + inner.append(entries, callback).await + } + + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + let mut inner = self.inner.lock().await; + inner.truncate(log_id).await + } + + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + let mut inner = self.inner.lock().await; + inner.purge(log_id).await + } + + async fn get_log_reader(&mut self) -> Self::LogReader { + self.clone() + } + } +} diff --git a/examples/memstore/test-cluster.sh b/examples/memstore/test-cluster.sh new file mode 100755 index 000000000..a4913ae10 --- /dev/null +++ b/examples/memstore/test-cluster.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "No shell test script for memstore" \ No newline at end of file diff --git a/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml b/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml index fb2a1814f..d6702cb7e 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml +++ b/examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml @@ -16,6 +16,7 @@ license = "MIT OR Apache-2.0" repository = "https://github.com/datafuselabs/openraft" [dependencies] +memstore = { path = "../memstore", features = [] } openraft = { path = "../../openraft", features = ["serde", "storage-v2", "generic-snapshot-data"] } clap = { version = "4.1.11", features = ["derive", "env"] } diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs index 4d4c9d7ec..27851cfa3 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs @@ -91,7 +91,7 @@ pub async fn new_raft(node_id: NodeId, router: Router) -> (typ::Raft, App) { let config = Arc::new(config.validate().unwrap()); // Create a instance of where the Raft logs will be stored. - let log_store = Arc::new(LogStore::default()); + let log_store = LogStore::default(); // Create a instance of where the state machine data will be stored. let state_machine_store = Arc::new(StateMachineStore::default()); diff --git a/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs b/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs index 98d36aa8b..6755de6fc 100644 --- a/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs +++ b/examples/raft-kv-memstore-generic-snapshot-data/src/store.rs @@ -1,25 +1,19 @@ use std::collections::BTreeMap; use std::fmt::Debug; -use std::ops::RangeBounds; use std::sync::Arc; use std::sync::Mutex; -use openraft::storage::LogFlushed; -use openraft::storage::LogState; -use openraft::storage::RaftLogStorage; use openraft::storage::RaftStateMachine; use openraft::storage::Snapshot; use openraft::BasicNode; use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; -use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StoredMembership; -use openraft::Vote; use serde::Deserialize; use serde::Serialize; @@ -27,6 +21,8 @@ use crate::typ; use crate::NodeId; use crate::TypeConfig; +pub type LogStore = memstore::LogStore; + #[derive(Serialize, Deserialize, Debug, Clone)] pub enum Request { Set { key: String, value: String }, @@ -80,30 +76,6 @@ pub struct StateMachineStore { current_snapshot: Mutex>, } -#[derive(Debug, Default)] -pub struct LogStore { - last_purged_log_id: Mutex>>, - - /// The Raft log. - log: Mutex>>, - - committed: Mutex>>, - - /// The current granted vote. - vote: Mutex>>, -} - -impl RaftLogReader for Arc { - async fn try_get_log_entries + Clone + Debug>( - &mut self, - range: RB, - ) -> Result>, StorageError> { - let log = self.log.lock().unwrap(); - let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); - Ok(response) - } -} - impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn build_snapshot(&mut self) -> Result, StorageError> { @@ -247,98 +219,3 @@ impl RaftStateMachine for Arc { self.clone() } } - -impl RaftLogStorage for Arc { - type LogReader = Self; - - async fn get_log_state(&mut self) -> Result, StorageError> { - let log = self.log.lock().unwrap(); - let last = log.iter().next_back().map(|(_, ent)| ent.log_id); - - let last_purged = *self.last_purged_log_id.lock().unwrap(); - - let last = match last { - None => last_purged, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id: last_purged, - last_log_id: last, - }) - } - - async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { - let mut c = self.committed.lock().unwrap(); - *c = committed; - Ok(()) - } - - async fn read_committed(&mut self) -> Result>, StorageError> { - let committed = self.committed.lock().unwrap(); - Ok(*committed) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { - let mut v = self.vote.lock().unwrap(); - *v = Some(*vote); - Ok(()) - } - - async fn read_vote(&mut self) -> Result>, StorageError> { - Ok(*self.vote.lock().unwrap()) - } - - #[tracing::instrument(level = "trace", skip(self, entries, callback))] - async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> - where I: IntoIterator> { - // Simple implementation that calls the flush-before-return `append_to_log`. - let mut log = self.log.lock().unwrap(); - for entry in entries { - log.insert(entry.log_id.index, entry); - } - callback.log_io_completed(Ok(())); - - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { - tracing::debug!("delete_log: [{:?}, +oo)", log_id); - - let mut log = self.log.lock().unwrap(); - let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); - for key in keys { - log.remove(&key); - } - - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { - tracing::debug!("delete_log: (-oo, {:?}]", log_id); - - { - let mut ld = self.last_purged_log_id.lock().unwrap(); - assert!(*ld <= Some(log_id)); - *ld = Some(log_id); - } - - { - let mut log = self.log.lock().unwrap(); - - let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); - for key in keys { - log.remove(&key); - } - } - - Ok(()) - } - - async fn get_log_reader(&mut self) -> Self::LogReader { - self.clone() - } -} diff --git a/examples/raft-kv-memstore/Cargo.toml b/examples/raft-kv-memstore/Cargo.toml index 55958d221..b9004d830 100644 --- a/examples/raft-kv-memstore/Cargo.toml +++ b/examples/raft-kv-memstore/Cargo.toml @@ -20,6 +20,7 @@ name = "raft-key-value" path = "src/bin/main.rs" [dependencies] +memstore = { path = "../memstore", features = [] } openraft = { path = "../../openraft", features = ["serde", "storage-v2"] } actix-web = "4.0.0-rc.2" diff --git a/examples/raft-kv-memstore/src/app.rs b/examples/raft-kv-memstore/src/app.rs index 0555f6076..18483c93b 100644 --- a/examples/raft-kv-memstore/src/app.rs +++ b/examples/raft-kv-memstore/src/app.rs @@ -11,7 +11,7 @@ pub struct App { pub id: NodeId, pub addr: String, pub raft: Raft, - pub log_store: Arc, + pub log_store: LogStore, pub state_machine_store: Arc, pub config: Arc, } diff --git a/examples/raft-kv-memstore/src/lib.rs b/examples/raft-kv-memstore/src/lib.rs index 0228caa15..b0dbae181 100644 --- a/examples/raft-kv-memstore/src/lib.rs +++ b/examples/raft-kv-memstore/src/lib.rs @@ -66,7 +66,7 @@ pub async fn start_example_raft_node(node_id: NodeId, http_addr: String) -> std: let config = Arc::new(config.validate().unwrap()); // Create a instance of where the Raft logs will be stored. - let log_store = Arc::new(LogStore::default()); + let log_store = LogStore::default(); // Create a instance of where the Raft data will be stored. let state_machine_store = Arc::new(StateMachineStore::default()); diff --git a/examples/raft-kv-memstore/src/store/mod.rs b/examples/raft-kv-memstore/src/store/mod.rs index fab713e09..b64ea4427 100644 --- a/examples/raft-kv-memstore/src/store/mod.rs +++ b/examples/raft-kv-memstore/src/store/mod.rs @@ -1,28 +1,21 @@ use std::collections::BTreeMap; use std::fmt::Debug; use std::io::Cursor; -use std::ops::RangeBounds; use std::sync::Arc; use std::sync::Mutex; -use openraft::storage::LogFlushed; -use openraft::storage::LogState; -use openraft::storage::RaftLogStorage; use openraft::storage::RaftStateMachine; use openraft::storage::Snapshot; use openraft::BasicNode; use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; -use openraft::OptionalSend; -use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftTypeConfig; use openraft::SnapshotMeta; use openraft::StorageError; use openraft::StorageIOError; use openraft::StoredMembership; -use openraft::Vote; use serde::Deserialize; use serde::Serialize; use tokio::sync::RwLock; @@ -30,6 +23,8 @@ use tokio::sync::RwLock; use crate::NodeId; use crate::TypeConfig; +pub type LogStore = memstore::LogStore; + /** * Here you will set the types of request that will interact with the raft nodes. * For example the `Set` will be used to write data (key and value) to the raft database. @@ -88,30 +83,6 @@ pub struct StateMachineStore { current_snapshot: RwLock>, } -#[derive(Debug, Default)] -pub struct LogStore { - last_purged_log_id: RwLock>>, - - /// The Raft log. - log: RwLock>>, - - committed: RwLock>>, - - /// The current granted vote. - vote: RwLock>>, -} - -impl RaftLogReader for Arc { - async fn try_get_log_entries + Clone + Debug + OptionalSend>( - &mut self, - range: RB, - ) -> Result>, StorageError> { - let log = self.log.read().await; - let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); - Ok(response) - } -} - impl RaftSnapshotBuilder for Arc { #[tracing::instrument(level = "trace", skip(self))] async fn build_snapshot(&mut self) -> Result, StorageError> { @@ -259,98 +230,3 @@ impl RaftStateMachine for Arc { self.clone() } } - -impl RaftLogStorage for Arc { - type LogReader = Self; - - async fn get_log_state(&mut self) -> Result, StorageError> { - let log = self.log.read().await; - let last = log.iter().next_back().map(|(_, ent)| ent.log_id); - - let last_purged = *self.last_purged_log_id.read().await; - - let last = match last { - None => last_purged, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id: last_purged, - last_log_id: last, - }) - } - - async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { - let mut c = self.committed.write().await; - *c = committed; - Ok(()) - } - - async fn read_committed(&mut self) -> Result>, StorageError> { - let committed = self.committed.read().await; - Ok(*committed) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { - let mut v = self.vote.write().await; - *v = Some(*vote); - Ok(()) - } - - async fn read_vote(&mut self) -> Result>, StorageError> { - Ok(*self.vote.read().await) - } - - #[tracing::instrument(level = "trace", skip(self, entries, callback))] - async fn append(&mut self, entries: I, callback: LogFlushed) -> Result<(), StorageError> - where I: IntoIterator> + Send { - // Simple implementation that calls the flush-before-return `append_to_log`. - let mut log = self.log.write().await; - for entry in entries { - log.insert(entry.log_id.index, entry); - } - callback.log_io_completed(Ok(())); - - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { - tracing::debug!("delete_log: [{:?}, +oo)", log_id); - - let mut log = self.log.write().await; - let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); - for key in keys { - log.remove(&key); - } - - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { - tracing::debug!("delete_log: (-oo, {:?}]", log_id); - - { - let mut ld = self.last_purged_log_id.write().await; - assert!(*ld <= Some(log_id)); - *ld = Some(log_id); - } - - { - let mut log = self.log.write().await; - - let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); - for key in keys { - log.remove(&key); - } - } - - Ok(()) - } - - async fn get_log_reader(&mut self) -> Self::LogReader { - self.clone() - } -} diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 329897cd8..583c1b0fc 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -106,7 +106,7 @@ pub use crate::raft_types::SnapshotSegmentId; pub use crate::storage::LogState; pub use crate::storage::RaftLogReader; pub use crate::storage::RaftSnapshotBuilder; -pub use crate::storage::RaftStorage; +#[cfg(not(feature = "storage-v2"))] pub use crate::storage::RaftStorage; pub use crate::storage::Snapshot; pub use crate::storage::SnapshotMeta; pub use crate::storage::StorageHelper;