Skip to content

Add an example of LVMT to LvmtStorage struct #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/lvmt/example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::sync::Arc;

use crate::{
backends::DatabaseTrait,
errors::Result,
middlewares::{
confirm_ids_to_history, confirm_maps_to_history, CommitID, KeyValueStoreBulks,
VersionedStore, VersionedStoreCache,
},
};

use super::{
auth_changes::AuthChangeTable,
storage::LvmtStore,
table_schema::{AmtNodes, FlatKeyValue, SlotAllocations},
};

pub struct LvmtStorage<D: DatabaseTrait> {
backend: D,
key_value_cache: VersionedStoreCache<FlatKeyValue>,
amt_node_cache: VersionedStoreCache<AmtNodes>,
slot_alloc_cache: VersionedStoreCache<SlotAllocations>,
}

impl<D: DatabaseTrait> LvmtStorage<D> {
pub fn new(backend: D) -> Result<Self> {
Ok(Self {
backend,
key_value_cache: VersionedStoreCache::new_empty(),
amt_node_cache: VersionedStoreCache::new_empty(),
slot_alloc_cache: VersionedStoreCache::new_empty(),
})
}

pub fn as_manager(&mut self) -> Result<LvmtStore<'_, '_>> {
let key_value_store = VersionedStore::new(&self.backend, &mut self.key_value_cache)?;
let amt_node_store = VersionedStore::new(&self.backend, &mut self.amt_node_cache)?;
let slot_alloc_store = VersionedStore::new(&self.backend, &mut self.slot_alloc_cache)?;
let auth_changes =
KeyValueStoreBulks::new(Arc::new(self.backend.view::<AuthChangeTable>()?));

Ok(LvmtStore::new(
key_value_store,
amt_node_store,
slot_alloc_store,
auth_changes,
))
}

pub fn commit(&mut self, write_schema: <D as DatabaseTrait>::WriteSchema) -> Result<()> {
self.backend.commit(write_schema)
}

pub fn confirmed_pending_to_history(
&mut self,
new_root_commit_id: CommitID,
write_schema: &D::WriteSchema,
) -> Result<()> {
let key_value_confirmed_path = self.key_value_cache.change_root(new_root_commit_id)?;
let amt_node_confirmed_path = self.amt_node_cache.change_root(new_root_commit_id)?;
let slot_alloc_confirmed_path = self.slot_alloc_cache.change_root(new_root_commit_id)?;

assert!(key_value_confirmed_path.is_same_path(&amt_node_confirmed_path));
assert!(key_value_confirmed_path.is_same_path(&slot_alloc_confirmed_path));

let start_height = key_value_confirmed_path.start_height;
let commit_ids = &key_value_confirmed_path.commit_ids;

confirm_ids_to_history::<D>(&self.backend, start_height, commit_ids, write_schema)?;

confirm_maps_to_history::<D, FlatKeyValue>(
&self.backend,
start_height,
key_value_confirmed_path.key_value_maps,
write_schema,
)?;
confirm_maps_to_history::<D, AmtNodes>(
&self.backend,
start_height,
amt_node_confirmed_path.key_value_maps,
write_schema,
)?;
confirm_maps_to_history::<D, SlotAllocations>(
&self.backend,
start_height,
slot_alloc_confirmed_path.key_value_maps,
write_schema,
)?;

Ok(())
}
}
1 change: 1 addition & 0 deletions src/lvmt/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod amt_change_manager;
mod auth_changes;
pub mod crypto;
mod example;
mod storage;
pub mod table_schema;
pub mod types;
14 changes: 14 additions & 0 deletions src/lvmt/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ pub struct LvmtStore<'cache, 'db> {
const ALLOC_START_VERSION: u64 = 1;

impl<'cache, 'db> LvmtStore<'cache, 'db> {
pub fn new(
key_value_store: VersionedStore<'cache, 'db, FlatKeyValue>,
amt_node_store: VersionedStore<'cache, 'db, AmtNodes>,
slot_alloc_store: VersionedStore<'cache, 'db, SlotAllocations>,
auth_changes: KeyValueStoreBulks<'db, AuthChangeTable>,
) -> Self {
Self {
key_value_store,
amt_node_store,
slot_alloc_store,
auth_changes,
}
}

fn commit(
&mut self,
old_commit: Option<CommitID>,
Expand Down
3 changes: 2 additions & 1 deletion src/middlewares/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ pub use commit_id_schema::{
};
pub use key_value_store_bulks::{ChangeKey, KeyValueStoreBulks};
pub use versioned_flat_key_value::{
table_schema, PendingError, VersionedStore, VersionedStoreCache,
confirm_ids_to_history, confirm_maps_to_history, table_schema, PendingError, VersionedStore,
VersionedStoreCache,
};
105 changes: 60 additions & 45 deletions src/middlewares/versioned_flat_key_value/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,44 +133,45 @@ fn get_versioned_key<'db, T: VersionedKeyValueSchema>(
change_history_table.get_versioned_key(&found_version_number, key)
}

#[allow(clippy::type_complexity)]
fn confirm_series_to_history<D: DatabaseTrait, T: VersionedKeyValueSchema>(
db: &mut D,
pub fn confirmed_pending_to_history<D: DatabaseTrait, T: VersionedKeyValueSchema>(
db: &D,
pending_part: &mut VersionedMap<PendingKeyValueConfig<T, CommitID>>,
new_root_commit_id: CommitID,
write_schema: &D::WriteSchema,
) -> Result<()> {
let confirmed_path = pending_part.change_root(new_root_commit_id)?;

confirm_ids_to_history::<D>(
db,
confirmed_path.start_height,
&confirmed_path.commit_ids,
write_schema,
)?;

confirm_maps_to_history::<D, T>(
db,
confirmed_path.start_height,
confirmed_path.key_value_maps,
write_schema,
)?;

Ok(())
}

pub fn confirm_maps_to_history<D: DatabaseTrait, T: VersionedKeyValueSchema>(
db: &D,
to_confirm_start_height: usize,
to_confirm_ids_maps: Vec<(CommitID, BTreeMap<T::Key, impl Into<Option<T::Value>>>)>,
to_confirm_maps: Vec<BTreeMap<T::Key, impl Into<Option<T::Value>>>>,
write_schema: &D::WriteSchema,
) -> Result<()> {
let history_index_table = Arc::new(db.view::<HistoryIndicesTable<T>>()?);
let commit_id_table = Arc::new(db.view::<CommitIDSchema>()?);
let history_number_table = Arc::new(db.view::<HistoryNumberSchema>()?);
let history_index_table = db.view::<HistoryIndicesTable<T>>()?;
let change_history_table =
KeyValueStoreBulks::new(Arc::new(db.view::<HistoryChangeTable<T>>()?));

let write_schema = D::write_schema();

for (delta_height, (confirmed_commit_id, updates)) in
to_confirm_ids_maps.into_iter().enumerate()
{
for (delta_height, updates) in to_confirm_maps.into_iter().enumerate() {
let height = to_confirm_start_height + delta_height;
let history_number = height_to_history_number(height);

if commit_id_table.get(&confirmed_commit_id)?.is_some()
|| history_number_table.get(&history_number)?.is_some()
{
return Err(StorageError::ConsistencyCheckFailure);
}

let commit_id_table_op = (
Cow::Owned(confirmed_commit_id),
Some(Cow::Owned(history_number)),
);
write_schema.write::<CommitIDSchema>(commit_id_table_op);

let history_number_table_op = (
Cow::Owned(history_number),
Some(Cow::Owned(confirmed_commit_id)),
);
write_schema.write::<HistoryNumberSchema>(history_number_table_op);

let history_indices_table_op = updates.keys().map(|key| {
(
Cow::Owned(HistoryIndexKey(key.clone(), history_number)),
Expand All @@ -186,26 +187,40 @@ fn confirm_series_to_history<D: DatabaseTrait, T: VersionedKeyValueSchema>(
)?;
}

std::mem::drop(history_index_table);
std::mem::drop(commit_id_table);
std::mem::drop(history_number_table);
std::mem::drop(change_history_table);

db.commit(write_schema)?;

Ok(())
}

pub fn confirmed_pending_to_history<D: DatabaseTrait, T: VersionedKeyValueSchema>(
db: &mut D,
pending_part: &mut VersionedMap<PendingKeyValueConfig<T, CommitID>>,
new_root_commit_id: CommitID,
pub fn confirm_ids_to_history<D: DatabaseTrait>(
db: &D,
to_confirm_start_height: usize,
to_confirm_ids: &[CommitID],
write_schema: &D::WriteSchema,
) -> Result<()> {
// old root..=new root's parent
let (to_confirm_start_height, to_confirm_ids_maps) =
pending_part.change_root(new_root_commit_id)?;
let commit_id_table = db.view::<CommitIDSchema>()?;
let history_number_table = db.view::<HistoryNumberSchema>()?;

for (delta_height, confirmed_commit_id) in to_confirm_ids.iter().enumerate() {
let height = to_confirm_start_height + delta_height;
let history_number = height_to_history_number(height);

if commit_id_table.get(confirmed_commit_id)?.is_some()
|| history_number_table.get(&history_number)?.is_some()
{
return Err(StorageError::ConsistencyCheckFailure);
}

let commit_id_table_op = (
Cow::Owned(*confirmed_commit_id),
Some(Cow::Owned(history_number)),
);
write_schema.write::<CommitIDSchema>(commit_id_table_op);

confirm_series_to_history::<D, T>(db, to_confirm_start_height, to_confirm_ids_maps)?;
let history_number_table_op = (
Cow::Owned(history_number),
Some(Cow::Owned(*confirmed_commit_id)),
);
write_schema.write::<HistoryNumberSchema>(history_number_table_op);
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ pub struct ApplyRecord<S: PendingKeyValueSchema> {
pub commit_id: S::CommitId,
}

/// `commit_ids` and `key_value_maps` should be ordered from the smallest height to the largest height.
pub struct ConfirmedPathInfo<S: PendingKeyValueSchema> {
pub start_height: usize,
pub commit_ids: Vec<S::CommitId>,
pub key_value_maps: Vec<KeyValueMap<S>>,
}

impl<S: PendingKeyValueSchema> ConfirmedPathInfo<S> {
pub fn is_same_path<T: PendingKeyValueSchema>(&self, other: &ConfirmedPathInfo<T>) -> bool
where
S::CommitId: PartialEq<T::CommitId>,
{
self.start_height == other.start_height && self.commit_ids == other.commit_ids
}
}

pub type KeyValueMap<S> = BTreeMap<Key<S>, ValueEntry<Value<S>>>;
pub type RecoverMap<S> = BTreeMap<Key<S>, RecoverRecord<S>>;
pub type ApplyMap<S> = BTreeMap<Key<S>, ApplyRecord<S>>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use std::collections::VecDeque;

use crate::middlewares::versioned_flat_key_value::pending_part::pending_schema::{
KeyValueMap, PendingKeyValueSchema, Result as PendResult,
ConfirmedPathInfo, KeyValueMap, PendingKeyValueSchema, Result as PendResult,
};

use super::{SlabIndex, Tree};

// methods to support VersionedMap::change_root()
impl<S: PendingKeyValueSchema> Tree<S> {
#[allow(clippy::type_complexity)]
pub fn change_root(
&mut self,
commit_id: S::CommitId,
) -> PendResult<(usize, Vec<(S::CommitId, KeyValueMap<S>)>), S> {
pub fn change_root(&mut self, commit_id: S::CommitId) -> PendResult<ConfirmedPathInfo<S>, S> {
let slab_index = self.get_slab_index_by_commit_id(commit_id)?;

// old_root..=new_root's parent
Expand All @@ -35,8 +31,15 @@ impl<S: PendingKeyValueSchema> Tree<S> {
self.parent_of_root = Some(last.0);
}

// (height of old_root, old_root..=new_root's parent)
Ok((self.height_of_root - to_commit.len(), to_commit))
// height of old_root
let start_height_to_commit = self.height_of_root - to_commit.len();
let (to_commit_ids, to_commit_maps) = to_commit.into_iter().unzip();

Ok(ConfirmedPathInfo {
start_height: start_height_to_commit,
commit_ids: to_commit_ids,
key_value_maps: to_commit_maps,
})
}

// excluding target
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::BTreeMap;
use crate::traits::{IsCompleted, NeedNext};
use crate::types::ValueEntry;

use super::pending_schema::ConfirmedPathInfo;
use super::{
current_map::CurrentMap,
pending_schema::{KeyValueMap, PendingKeyValueSchema, RecoverRecord, Result as PendResult},
Expand Down Expand Up @@ -118,16 +119,10 @@ impl<S: PendingKeyValueSchema> VersionedMap<S> {

// change_root
impl<S: PendingKeyValueSchema> VersionedMap<S> {
#[allow(clippy::type_complexity)]
// old_root..=new_root's parent: (commit_id, key_value_map)
pub fn change_root(
&mut self,
commit_id: S::CommitId,
) -> PendResult<(usize, Vec<(S::CommitId, KeyValueMap<S>)>), S> {
// to_commit: old_root..=new_root's parent
let (start_height_to_commit, to_commit) = self.tree.change_root(commit_id)?;
pub fn change_root(&mut self, commit_id: S::CommitId) -> PendResult<ConfirmedPathInfo<S>, S> {
let confirm_path_info = self.tree.change_root(commit_id)?;

if let Some(parent_of_new_root) = to_commit.last() {
if confirm_path_info.commit_ids.last().is_some() {
// clear current is necessary
// because apply_commit_id in current.map may be removed from pending part
self.clear_removed_current();
Expand All @@ -136,7 +131,7 @@ impl<S: PendingKeyValueSchema> VersionedMap<S> {
}
}

Ok((start_height_to_commit, to_commit))
Ok(confirm_path_info)
}
}

Expand Down
Loading
Loading