Skip to content

Commit

Permalink
Refactor: example raft-kv-rocksdb reuses example/rocksstore
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Dec 27, 2024
1 parent be5b519 commit 0ec5676
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 459 deletions.
1 change: 1 addition & 0 deletions examples/raft-kv-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ path = "src/bin/main.rs"

[dependencies]
openraft = { path = "../../openraft", features = ["serde", "type-alias"] }
openraft-rocksstore = { path = "../rocksstore" }

tokio = { version = "1.35.1", features = ["full"] }
byteorder = "1.4.3"
Expand Down
214 changes: 5 additions & 209 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::io::Cursor;
use std::ops::RangeBounds;
use std::path::Path;
use std::sync::Arc;

use byteorder::BigEndian;
use byteorder::ReadBytesExt;
use byteorder::WriteBytesExt;
use openraft::storage::RaftLogStorage;
use openraft::storage::RaftStateMachine;
use openraft::AnyError;
use openraft::EntryPayload;
use openraft::ErrorVerb;
use openraft::OptionalSend;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft_rocksstore::log_store::RocksLogStore;
use rocksdb::ColumnFamily;
use rocksdb::ColumnFamilyDescriptor;
use rocksdb::Direction;
use rocksdb::Options;
use rocksdb::DB;
use serde::Deserialize;
Expand Down Expand Up @@ -249,219 +243,21 @@ impl RaftStateMachine<TypeConfig> for StateMachineStore {
}
}

#[derive(Debug, Clone)]
pub struct LogStore {
db: Arc<DB>,
}
type StorageResult<T> = Result<T, StorageError>;

/// converts an id to a byte vector for storing in the database.
/// Note that we're using big endian encoding to ensure correct sorting of keys
fn id_to_bin(id: u64) -> Vec<u8> {
let mut buf = Vec::with_capacity(8);
buf.write_u64::<BigEndian>(id).unwrap();
buf
}

fn bin_to_id(buf: &[u8]) -> u64 {
(&buf[0..8]).read_u64::<BigEndian>().unwrap()
}

impl LogStore {
fn store(&self) -> &ColumnFamily {
self.db.cf_handle("store").unwrap()
}

fn logs(&self) -> &ColumnFamily {
self.db.cf_handle("logs").unwrap()
}

fn flush(&self, subject: ErrorSubject, verb: ErrorVerb) -> Result<(), StorageError> {
self.db.flush_wal(true).map_err(|e| StorageError::new(subject, verb, AnyError::new(&e)))?;
Ok(())
}

fn get_last_purged_(&self) -> StorageResult<Option<LogId>> {
Ok(self
.db
.get_cf(self.store(), b"last_purged_log_id")
.map_err(|e| StorageError::read(&e))?
.and_then(|v| serde_json::from_slice(&v).ok()))
}

fn set_last_purged_(&self, log_id: LogId) -> StorageResult<()> {
self.db
.put_cf(
self.store(),
b"last_purged_log_id",
serde_json::to_vec(&log_id).unwrap().as_slice(),
)
.map_err(|e| StorageError::write(&e))?;

self.flush(ErrorSubject::Store, ErrorVerb::Write)?;
Ok(())
}

fn set_committed_(&self, committed: &Option<LogId>) -> Result<(), StorageError> {
let json = serde_json::to_vec(committed).unwrap();

self.db.put_cf(self.store(), b"committed", json).map_err(|e| StorageError::write(&e))?;

self.flush(ErrorSubject::Store, ErrorVerb::Write)?;
Ok(())
}

fn get_committed_(&self) -> StorageResult<Option<LogId>> {
Ok(self
.db
.get_cf(self.store(), b"committed")
.map_err(|e| StorageError::read(&e))?
.and_then(|v| serde_json::from_slice(&v).ok()))
}

fn set_vote_(&self, vote: &Vote) -> StorageResult<()> {
self.db
.put_cf(self.store(), b"vote", serde_json::to_vec(vote).unwrap())
.map_err(|e| StorageError::write_vote(&e))?;

self.flush(ErrorSubject::Vote, ErrorVerb::Write)?;
Ok(())
}

fn get_vote_(&self) -> StorageResult<Option<Vote>> {
Ok(self
.db
.get_cf(self.store(), b"vote")
.map_err(|e| StorageError::write_vote(&e))?
.and_then(|v| serde_json::from_slice(&v).ok()))
}
}

impl RaftLogReader<TypeConfig> for LogStore {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
&mut self,
range: RB,
) -> StorageResult<Vec<Entry>> {
let start = match range.start_bound() {
std::ops::Bound::Included(x) => id_to_bin(*x),
std::ops::Bound::Excluded(x) => id_to_bin(*x + 1),
std::ops::Bound::Unbounded => id_to_bin(0),
};
self.db
.iterator_cf(self.logs(), rocksdb::IteratorMode::From(&start, Direction::Forward))
.map(|res| {
let (id, val) = res.unwrap();
let entry: StorageResult<Entry> = serde_json::from_slice(&val).map_err(|e| StorageError::read_logs(&e));
let id = bin_to_id(&id);

assert_eq!(Ok(id), entry.as_ref().map(|e| e.log_id.index));
(id, entry)
})
.take_while(|(id, _)| range.contains(id))
.map(|x| x.1)
.collect()
}

async fn read_vote(&mut self) -> Result<Option<Vote>, StorageError> {
self.get_vote_()
}
}

impl RaftLogStorage<TypeConfig> for LogStore {
type LogReader = Self;

async fn get_log_state(&mut self) -> StorageResult<LogState> {
let last = self.db.iterator_cf(self.logs(), rocksdb::IteratorMode::End).next().and_then(|res| {
let (_, ent) = res.unwrap();
Some(serde_json::from_slice::<Entry>(&ent).ok()?.log_id)
});

let last_purged_log_id = self.get_last_purged_()?;

let last_log_id = match last {
None => last_purged_log_id,
Some(x) => Some(x),
};
Ok(LogState {
last_purged_log_id,
last_log_id,
})
}

async fn save_committed(&mut self, _committed: Option<LogId>) -> Result<(), StorageError> {
self.set_committed_(&_committed)?;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId>, StorageError> {
let c = self.get_committed_()?;
Ok(c)
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> {
self.set_vote_(vote)
}

#[tracing::instrument(level = "trace", skip_all)]
async fn append<I>(&mut self, entries: I, callback: IOFlushed) -> StorageResult<()>
where
I: IntoIterator<Item = Entry> + Send,
I::IntoIter: Send,
{
for entry in entries {
let id = id_to_bin(entry.log_id.index);
assert_eq!(bin_to_id(&id), entry.log_id.index);
self.db
.put_cf(
self.logs(),
id,
serde_json::to_vec(&entry).map_err(|e| StorageError::write_logs(&e))?,
)
.map_err(|e| StorageError::write_logs(&e))?;
}

callback.io_completed(Ok(()));

Ok(())
}

#[tracing::instrument(level = "debug", skip(self))]
async fn truncate(&mut self, log_id: LogId) -> StorageResult<()> {
tracing::debug!("delete_log: [{:?}, +oo)", log_id);

let from = id_to_bin(log_id.index);
let to = id_to_bin(0xff_ff_ff_ff_ff_ff_ff_ff);
self.db.delete_range_cf(self.logs(), &from, &to).map_err(|e| StorageError::write_logs(&e))
}

#[tracing::instrument(level = "debug", skip(self))]
async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> {
tracing::debug!("delete_log: [0, {:?}]", log_id);

self.set_last_purged_(log_id)?;
let from = id_to_bin(0);
let to = id_to_bin(log_id.index + 1);
self.db.delete_range_cf(self.logs(), &from, &to).map_err(|e| StorageError::write_logs(&e))
}

async fn get_log_reader(&mut self) -> Self::LogReader {
self.clone()
}
}

pub(crate) async fn new_storage<P: AsRef<Path>>(db_path: P) -> (LogStore, StateMachineStore) {
pub(crate) async fn new_storage<P: AsRef<Path>>(db_path: P) -> (RocksLogStore<TypeConfig>, StateMachineStore) {
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);

let store = ColumnFamilyDescriptor::new("store", Options::default());
let meta = ColumnFamilyDescriptor::new("meta", Options::default());
let logs = ColumnFamilyDescriptor::new("logs", Options::default());

let db = DB::open_cf_descriptors(&db_opts, db_path, vec![store, logs]).unwrap();
let db = DB::open_cf_descriptors(&db_opts, db_path, vec![store, meta, logs]).unwrap();
let db = Arc::new(db);

let log_store = LogStore { db: db.clone() };
let log_store = RocksLogStore::new(db.clone());
let sm_store = StateMachineStore::new(db).await.unwrap();

(log_store, sm_store)
Expand Down
2 changes: 1 addition & 1 deletion examples/rocksstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ repository = "https://github.com/databendlabs/openraft"
openraft = { path= "../../openraft", version = "0.10.0", features=["serde", "type-alias"] }

rocksdb = "0.22.0"
rand = "*"
rand = "0.8"
byteorder = "1.4.3"

serde = { version = "1.0.114", features = ["derive"] }
Expand Down
Loading

0 comments on commit 0ec5676

Please sign in to comment.