Skip to content

Commit

Permalink
Refactor: storage implementation should use `OptionalSend, OptionalSy…
Browse files Browse the repository at this point in the history
…nc` instead of `Send, Sync`
  • Loading branch information
drmingdrmer committed Feb 18, 2024
1 parent a27e04b commit 14a591e
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 12 deletions.
4 changes: 3 additions & 1 deletion cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use openraft::storage::Snapshot;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::OptionalSend;
use openraft::OptionalSync;
use openraft::RaftLogId;
use openraft::RaftTypeConfig;
use openraft::SnapshotMeta;
Expand Down Expand Up @@ -100,7 +102,7 @@ impl StateMachineStore {
}

impl RaftLogReader<TypeConfig> for Arc<LogStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<Entry<TypeConfig>>, StorageError<NodeId>> {
Expand Down
4 changes: 3 additions & 1 deletion examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use openraft::BasicNode;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::OptionalSend;
use openraft::OptionalSync;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::RaftTypeConfig;
Expand Down Expand Up @@ -101,7 +103,7 @@ pub struct LogStore {
}

impl RaftLogReader<TypeConfig> for Arc<LogStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<Entry<TypeConfig>>, StorageError<NodeId>> {
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use openraft::ErrorSubject;
use openraft::ErrorVerb;
use openraft::LogId;
use openraft::OptionalSend;
use openraft::OptionalSync;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::SnapshotMeta;
Expand Down Expand Up @@ -368,7 +369,7 @@ impl LogStore {
}

impl RaftLogReader<TypeConfig> for LogStore {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> StorageResult<Vec<Entry<TypeConfig>>> {
Expand Down
6 changes: 4 additions & 2 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use openraft::storage::Snapshot;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::OptionalSend;
use openraft::OptionalSync;
use openraft::RaftLogId;
use openraft::RaftStorage;
use openraft::RaftTypeConfig;
Expand Down Expand Up @@ -205,7 +207,7 @@ impl Default for MemStore {
}

impl RaftLogReader<TypeConfig> for Arc<MemStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<Entry<TypeConfig>>, StorageError<MemNodeId>> {
Expand Down Expand Up @@ -390,7 +392,7 @@ impl RaftStorage<TypeConfig> for Arc<MemStore> {

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log<I>(&mut self, entries: I) -> Result<(), StorageError<MemNodeId>>
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
where I: IntoIterator<Item = Entry<TypeConfig>> + OptionalSend {
let mut log = self.log.write().await;
for entry in entries {
let s =
Expand Down
6 changes: 4 additions & 2 deletions rocksstore-compat07/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use openraft::ErrorSubject;
use openraft::ErrorVerb;
use openraft::LogId;
use openraft::LogState;
use openraft::OptionalSend;
use openraft::OptionalSync;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::RaftStorage;
Expand Down Expand Up @@ -368,7 +370,7 @@ impl RocksStore {
}

impl RaftLogReader<TypeConfig> for Arc<RocksStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> StorageResult<Vec<Entry<TypeConfig>>> {
Expand Down Expand Up @@ -507,7 +509,7 @@ impl RaftStorage<TypeConfig> for Arc<RocksStore> {

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log<I>(&mut self, entries: I) -> StorageResult<()>
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
where I: IntoIterator<Item = Entry<TypeConfig>> + OptionalSend {
for entry in entries {
let id = id_to_bin(entry.log_id.index);
assert_eq!(bin_to_id(&id), entry.log_id.index);
Expand Down
6 changes: 4 additions & 2 deletions rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use openraft::Entry;
use openraft::EntryPayload;
use openraft::ErrorVerb;
use openraft::LogId;
use openraft::OptionalSend;
use openraft::OptionalSync;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
use openraft::RaftStorage;
Expand Down Expand Up @@ -338,7 +340,7 @@ impl RocksStore {
}

impl RaftLogReader<TypeConfig> for Arc<RocksStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> StorageResult<Vec<Entry<TypeConfig>>> {
Expand Down Expand Up @@ -455,7 +457,7 @@ impl RaftStorage<TypeConfig> for Arc<RocksStore> {

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log<I>(&mut self, entries: I) -> StorageResult<()>
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
where I: IntoIterator<Item = Entry<TypeConfig>> + OptionalSend {
for entry in entries {
let id = id_to_bin(entry.log_id.index);
assert_eq!(bin_to_id(&id), entry.log_id.index);
Expand Down
6 changes: 4 additions & 2 deletions sledstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use openraft::BasicNode;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::OptionalSend;
use openraft::OptionalSync;
use openraft::RaftLogId;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
Expand Down Expand Up @@ -384,7 +386,7 @@ impl SledStore {
}

impl RaftLogReader<TypeConfig> for Arc<SledStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> StorageResult<Vec<Entry<TypeConfig>>> {
Expand Down Expand Up @@ -505,7 +507,7 @@ impl RaftStorage<TypeConfig> for Arc<SledStore> {

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log<I>(&mut self, entries: I) -> StorageResult<()>
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
where I: IntoIterator<Item = Entry<TypeConfig>> + OptionalSend {
let logs_tree = logs(&self.db);
let mut batch = sled::Batch::default();
for entry in entries {
Expand Down
4 changes: 3 additions & 1 deletion stores/rocksstore-v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use openraft::Entry;
use openraft::EntryPayload;
use openraft::ErrorVerb;
use openraft::LogId;
use openraft::OptionalSend;
use openraft::OptionalSync;
use openraft::RaftLogId;
use openraft::RaftLogReader;
use openraft::RaftSnapshotBuilder;
Expand Down Expand Up @@ -231,7 +233,7 @@ impl RocksLogStore {
}

impl RaftLogReader<TypeConfig> for RocksLogStore {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> StorageResult<Vec<Entry<TypeConfig>>> {
Expand Down

0 comments on commit 14a591e

Please sign in to comment.