Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: add examples/memstore to implement LogStore for other example applications #1022

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ jobs:
- "stable"
- "nightly"
example:
- "memstore"
- "raft-kv-memstore"
- "raft-kv-memstore-generic-snapshot-data"
- "raft-kv-memstore-singlethreaded"
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ members = [
"tests",
"rocksstore",
"rocksstore-compat07",
"sledstore"]
"sledstore",
]
exclude = [
"cluster_benchmark",
"stores/rocksstore-v2",
"examples/memstore",
"examples/raft-kv-memstore",
"examples/raft-kv-memstore-singlethreaded",
"examples/raft-kv-memstore-generic-snapshot-data",
Expand Down
25 changes: 25 additions & 0 deletions examples/memstore/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "memstore"
version = "0.1.0"
readme = "README.md"

edition = "2021"
authors = [
"drdr xp <drdr.xp@gmail.com>",
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example in-memory storage for `openraft`."
homepage = "https://github.com/datafuselabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/datafuselabs/openraft"

[dependencies]
openraft = { path = "../../openraft", features = ["serde", "storage-v2"] }

tokio = { version = "1.0", default-features = false, features = ["sync"] }

[features]

[package.metadata.docs.rs]
all-features = true
8 changes: 8 additions & 0 deletions examples/memstore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# memstore

This is a simple in-memory implementation of `RaftLogStore`.
Other example Raft based applications use this crate as a component to store raft logs.

TODO:

- [ ] Add `RaftStateMachine`
5 changes: 5 additions & 0 deletions examples/memstore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! Provide storage layer implementation for examples.

mod log_store;

pub use log_store::LogStore;
217 changes: 217 additions & 0 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
//! Provide `LogStore`, which is a in-memory implementation of `RaftLogStore` for demostration
//! purpose only.

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::ops::RangeBounds;
use std::sync::Arc;

use openraft::storage::LogFlushed;
use openraft::LogId;
use openraft::LogState;
use openraft::RaftLogId;
use openraft::RaftTypeConfig;
use openraft::StorageError;
use openraft::Vote;
use tokio::sync::Mutex;

/// RaftLogStore implementation with a in-memory storage
#[derive(Clone, Debug, Default)]
pub struct LogStore<C: RaftTypeConfig> {
inner: Arc<Mutex<LogStoreInner<C>>>,
}

#[derive(Debug)]
pub struct LogStoreInner<C: RaftTypeConfig> {
/// The last purged log id.
last_purged_log_id: Option<LogId<C::NodeId>>,

/// The Raft log.
log: BTreeMap<u64, C::Entry>,

/// The commit log id.
committed: Option<LogId<C::NodeId>>,

/// The current granted vote.
vote: Option<Vote<C::NodeId>>,
}

impl<C: RaftTypeConfig> Default for LogStoreInner<C> {
fn default() -> Self {
Self {
last_purged_log_id: None,
log: BTreeMap::new(),
committed: None,
vote: None,
}
}
}

impl<C: RaftTypeConfig> LogStoreInner<C> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>>
where
C::Entry: Clone,
{
let response = self.log.range(range.clone()).map(|(_, val)| val.clone()).collect::<Vec<_>>();
Ok(response)
}

async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C::NodeId>> {
let last = self.log.iter().next_back().map(|(_, ent)| *ent.get_log_id());

let last_purged = self.last_purged_log_id;

let last = match last {
None => last_purged,
Some(x) => Some(x),
};

Ok(LogState {
last_purged_log_id: last_purged,
last_log_id: last,
})
}

async fn save_committed(&mut self, committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C::NodeId>> {
self.committed = committed;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C::NodeId>> {
Ok(self.committed)
}

async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
self.vote = Some(*vote);
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>> {
Ok(self.vote)
}

async fn append<I>(&mut self, entries: I, callback: LogFlushed<C::NodeId>) -> Result<(), StorageError<C::NodeId>>
where I: IntoIterator<Item = C::Entry> {
// Simple implementation that calls the flush-before-return `append_to_log`.
for entry in entries {
self.log.insert(entry.get_log_id().index, entry);
}
callback.log_io_completed(Ok(()));

Ok(())
}

async fn truncate(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
let keys = self.log.range(log_id.index..).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
self.log.remove(&key);
}

Ok(())
}

async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
{
let ld = &mut self.last_purged_log_id;
assert!(*ld <= Some(log_id));
*ld = Some(log_id);
}

{
let keys = self.log.range(..=log_id.index).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
self.log.remove(&key);
}
}

Ok(())
}
}

mod impl_log_store {
use std::fmt::Debug;
use std::ops::RangeBounds;

use openraft::storage::LogFlushed;
use openraft::storage::RaftLogStorage;
use openraft::LogId;
use openraft::LogState;
use openraft::RaftLogReader;
use openraft::RaftTypeConfig;
use openraft::StorageError;
use openraft::Vote;

use crate::log_store::LogStore;

impl<C: RaftTypeConfig> RaftLogReader<C> for LogStore<C>
where C::Entry: Clone
{
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.try_get_log_entries(range).await
}
}

impl<C: RaftTypeConfig> RaftLogStorage<C> for LogStore<C>
where C::Entry: Clone
{
type LogReader = Self;

async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.get_log_state().await
}

async fn save_committed(&mut self, committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.save_committed(committed).await
}

async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.read_committed().await
}

async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.save_vote(vote).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.read_vote().await
}

async fn append<I>(
&mut self,
entries: I,
callback: LogFlushed<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>>
where
I: IntoIterator<Item = C::Entry>,
{
let mut inner = self.inner.lock().await;
inner.append(entries, callback).await
}

async fn truncate(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.truncate(log_id).await
}

async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
let mut inner = self.inner.lock().await;
inner.purge(log_id).await
}

async fn get_log_reader(&mut self) -> Self::LogReader {
self.clone()
}
}
}
3 changes: 3 additions & 0 deletions examples/memstore/test-cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

echo "No shell test script for memstore"
1 change: 1 addition & 0 deletions examples/raft-kv-memstore-generic-snapshot-data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ license = "MIT OR Apache-2.0"
repository = "https://github.com/datafuselabs/openraft"

[dependencies]
memstore = { path = "../memstore", features = [] }
openraft = { path = "../../openraft", features = ["serde", "storage-v2", "generic-snapshot-data"] }

clap = { version = "4.1.11", features = ["derive", "env"] }
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-generic-snapshot-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub async fn new_raft(node_id: NodeId, router: Router) -> (typ::Raft, App) {
let config = Arc::new(config.validate().unwrap());

// Create a instance of where the Raft logs will be stored.
let log_store = Arc::new(LogStore::default());
let log_store = LogStore::default();

// Create a instance of where the state machine data will be stored.
let state_machine_store = Arc::new(StateMachineStore::default());
Expand Down
Loading
Loading