Skip to content

Add an example of LVMT to LvmtStorage struct #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions src/lvmt/example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::sync::Arc;

use crate::{
backends::DatabaseTrait,
errors::Result,
middlewares::{
confirm_ids_to_history, confirm_maps_to_history, CommitID, KeyValueStoreBulks,
VersionedStore, VersionedStoreCache,
},
};

use super::{
auth_changes::AuthChangeTable,
storage::LvmtStore,
table_schema::{AmtNodes, FlatKeyValue, SlotAllocations},
};

pub struct LvmtStorage<D: DatabaseTrait> {
backend: D,
key_value_cache: VersionedStoreCache<FlatKeyValue>,
amt_node_cache: VersionedStoreCache<AmtNodes>,
slot_alloc_cache: VersionedStoreCache<SlotAllocations>,
}

impl<D: DatabaseTrait> LvmtStorage<D> {
pub fn new(backend: D) -> Result<Self> {
Ok(Self {
backend,
key_value_cache: VersionedStoreCache::new_empty(),
amt_node_cache: VersionedStoreCache::new_empty(),
slot_alloc_cache: VersionedStoreCache::new_empty(),
})
}

pub fn as_manager(&mut self) -> Result<LvmtStore<'_, '_>> {
let key_value_store = VersionedStore::new(&self.backend, &mut self.key_value_cache)?;
let amt_node_store = VersionedStore::new(&self.backend, &mut self.amt_node_cache)?;
let slot_alloc_store = VersionedStore::new(&self.backend, &mut self.slot_alloc_cache)?;
let auth_changes =
KeyValueStoreBulks::new(Arc::new(self.backend.view::<AuthChangeTable>()?));

Ok(LvmtStore::new(
key_value_store,
amt_node_store,
slot_alloc_store,
auth_changes,
))
}

pub fn commit(&mut self, write_schema: <D as DatabaseTrait>::WriteSchema) -> Result<()> {
self.backend.commit(write_schema)
}

pub fn confirmed_pending_to_history(
&mut self,
new_root_commit_id: CommitID,
write_schema: &D::WriteSchema,
) -> Result<()> {
// old root..=new root's parent
let (
key_value_to_confirm_start_height,
key_value_to_confirm_ids,
key_value_to_confirm_maps,
) = self.key_value_cache.change_root(new_root_commit_id)?;

let (amt_node_to_confirm_start_height, amt_node_to_confirm_ids, amt_node_to_confirm_maps) =
self.amt_node_cache.change_root(new_root_commit_id)?;

let (
slot_alloc_to_confirm_start_height,
slot_alloc_to_confirm_ids,
slot_alloc_to_confirm_maps,
) = self.slot_alloc_cache.change_root(new_root_commit_id)?;

assert_eq!(
key_value_to_confirm_start_height,
amt_node_to_confirm_start_height
);
assert_eq!(
key_value_to_confirm_start_height,
slot_alloc_to_confirm_start_height
);

assert_eq!(key_value_to_confirm_ids, amt_node_to_confirm_ids);
assert_eq!(key_value_to_confirm_ids, slot_alloc_to_confirm_ids);

let to_confirm_start_height = key_value_to_confirm_start_height;
let to_confirm_ids = key_value_to_confirm_ids;

confirm_ids_to_history::<D>(
&self.backend,
to_confirm_start_height,
&to_confirm_ids,
write_schema,
)?;

confirm_maps_to_history::<D, FlatKeyValue>(
&self.backend,
to_confirm_start_height,
key_value_to_confirm_maps,
write_schema,
)?;
confirm_maps_to_history::<D, AmtNodes>(
&self.backend,
to_confirm_start_height,
amt_node_to_confirm_maps,
write_schema,
)?;
confirm_maps_to_history::<D, SlotAllocations>(
&self.backend,
to_confirm_start_height,
slot_alloc_to_confirm_maps,
write_schema,
)?;

Ok(())
}
}
1 change: 1 addition & 0 deletions src/lvmt/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod amt_change_manager;
mod auth_changes;
pub mod crypto;
mod example;
mod storage;
pub mod table_schema;
pub mod types;
14 changes: 14 additions & 0 deletions src/lvmt/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ pub struct LvmtStore<'cache, 'db> {
const ALLOC_START_VERSION: u64 = 1;

impl<'cache, 'db> LvmtStore<'cache, 'db> {
pub fn new(
key_value_store: VersionedStore<'cache, 'db, FlatKeyValue>,
amt_node_store: VersionedStore<'cache, 'db, AmtNodes>,
slot_alloc_store: VersionedStore<'cache, 'db, SlotAllocations>,
auth_changes: KeyValueStoreBulks<'db, AuthChangeTable>,
) -> Self {
Self {
key_value_store,
amt_node_store,
slot_alloc_store,
auth_changes,
}
}

fn commit(
&mut self,
old_commit: Option<CommitID>,
Expand Down
3 changes: 2 additions & 1 deletion src/middlewares/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ pub use commit_id_schema::{
};
pub use key_value_store_bulks::{ChangeKey, KeyValueStoreBulks};
pub use versioned_flat_key_value::{
table_schema, PendingError, VersionedStore, VersionedStoreCache,
confirm_ids_to_history, confirm_maps_to_history, table_schema, PendingError, VersionedStore,
VersionedStoreCache,
};
93 changes: 53 additions & 40 deletions src/middlewares/versioned_flat_key_value/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,44 +133,37 @@ fn get_versioned_key<'db, T: VersionedKeyValueSchema>(
change_history_table.get_versioned_key(&found_version_number, key)
}

pub fn confirmed_pending_to_history<D: DatabaseTrait, T: VersionedKeyValueSchema>(
db: &D,
pending_part: &mut VersionedMap<PendingKeyValueConfig<T, CommitID>>,
new_root_commit_id: CommitID,
write_schema: &D::WriteSchema,
) -> Result<()> {
// old root..=new root's parent
let (to_confirm_start_height, to_confirm_ids, to_confirm_maps) =
pending_part.change_root(new_root_commit_id)?;

confirm_ids_to_history::<D>(db, to_confirm_start_height, &to_confirm_ids, write_schema)?;
confirm_maps_to_history::<D, T>(db, to_confirm_start_height, to_confirm_maps, write_schema)?;

Ok(())
}

#[allow(clippy::type_complexity)]
fn confirm_series_to_history<D: DatabaseTrait, T: VersionedKeyValueSchema>(
db: &mut D,
pub fn confirm_maps_to_history<D: DatabaseTrait, T: VersionedKeyValueSchema>(
db: &D,
to_confirm_start_height: usize,
to_confirm_ids_maps: Vec<(CommitID, BTreeMap<T::Key, impl Into<Option<T::Value>>>)>,
to_confirm_maps: Vec<BTreeMap<T::Key, impl Into<Option<T::Value>>>>,
write_schema: &D::WriteSchema,
) -> Result<()> {
let history_index_table = Arc::new(db.view::<HistoryIndicesTable<T>>()?);
let commit_id_table = Arc::new(db.view::<CommitIDSchema>()?);
let history_number_table = Arc::new(db.view::<HistoryNumberSchema>()?);
let change_history_table =
KeyValueStoreBulks::new(Arc::new(db.view::<HistoryChangeTable<T>>()?));

let write_schema = D::write_schema();

for (delta_height, (confirmed_commit_id, updates)) in
to_confirm_ids_maps.into_iter().enumerate()
{
for (delta_height, updates) in to_confirm_maps.into_iter().enumerate() {
let height = to_confirm_start_height + delta_height;
let history_number = height_to_history_number(height);

if commit_id_table.get(&confirmed_commit_id)?.is_some()
|| history_number_table.get(&history_number)?.is_some()
{
return Err(StorageError::ConsistencyCheckFailure);
}

let commit_id_table_op = (
Cow::Owned(confirmed_commit_id),
Some(Cow::Owned(history_number)),
);
write_schema.write::<CommitIDSchema>(commit_id_table_op);

let history_number_table_op = (
Cow::Owned(history_number),
Some(Cow::Owned(confirmed_commit_id)),
);
write_schema.write::<HistoryNumberSchema>(history_number_table_op);

let history_indices_table_op = updates.keys().map(|key| {
(
Cow::Owned(HistoryIndexKey(key.clone(), history_number)),
Expand All @@ -187,25 +180,45 @@ fn confirm_series_to_history<D: DatabaseTrait, T: VersionedKeyValueSchema>(
}

std::mem::drop(history_index_table);
std::mem::drop(commit_id_table);
std::mem::drop(history_number_table);
std::mem::drop(change_history_table);

db.commit(write_schema)?;

Ok(())
}

pub fn confirmed_pending_to_history<D: DatabaseTrait, T: VersionedKeyValueSchema>(
db: &mut D,
pending_part: &mut VersionedMap<PendingKeyValueConfig<T, CommitID>>,
new_root_commit_id: CommitID,
pub fn confirm_ids_to_history<D: DatabaseTrait>(
db: &D,
to_confirm_start_height: usize,
to_confirm_ids: &[CommitID],
write_schema: &D::WriteSchema,
) -> Result<()> {
// old root..=new root's parent
let (to_confirm_start_height, to_confirm_ids_maps) =
pending_part.change_root(new_root_commit_id)?;
let commit_id_table = Arc::new(db.view::<CommitIDSchema>()?);
let history_number_table = Arc::new(db.view::<HistoryNumberSchema>()?);

for (delta_height, confirmed_commit_id) in to_confirm_ids.iter().enumerate() {
let height = to_confirm_start_height + delta_height;
let history_number = height_to_history_number(height);

if commit_id_table.get(confirmed_commit_id)?.is_some()
|| history_number_table.get(&history_number)?.is_some()
{
return Err(StorageError::ConsistencyCheckFailure);
}

let commit_id_table_op = (
Cow::Owned(*confirmed_commit_id),
Some(Cow::Owned(history_number)),
);
write_schema.write::<CommitIDSchema>(commit_id_table_op);

confirm_series_to_history::<D, T>(db, to_confirm_start_height, to_confirm_ids_maps)?;
let history_number_table_op = (
Cow::Owned(history_number),
Some(Cow::Owned(*confirmed_commit_id)),
);
write_schema.write::<HistoryNumberSchema>(history_number_table_op);
}

std::mem::drop(commit_id_table);
std::mem::drop(history_number_table);

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl<S: PendingKeyValueSchema> VersionedMap<S> {
pub fn change_root(
&mut self,
commit_id: S::CommitId,
) -> PendResult<(usize, Vec<(S::CommitId, KeyValueMap<S>)>), S> {
) -> PendResult<(usize, Vec<S::CommitId>, Vec<KeyValueMap<S>>), S> {
// to_commit: old_root..=new_root's parent
let (start_height_to_commit, to_commit) = self.tree.change_root(commit_id)?;

Expand All @@ -136,7 +136,8 @@ impl<S: PendingKeyValueSchema> VersionedMap<S> {
}
}

Ok((start_height_to_commit, to_commit))
let (to_commit_ids, to_commit_maps) = to_commit.into_iter().unzip();
Ok((start_height_to_commit, to_commit_ids, to_commit_maps))
}
}

Expand Down
36 changes: 19 additions & 17 deletions src/middlewares/versioned_flat_key_value/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use crate::{
errors::Result,
middlewares::{
versioned_flat_key_value::{
confirm_series_to_history, confirmed_pending_to_history, pending_part::VersionedMap,
confirm_ids_to_history, confirm_maps_to_history, confirmed_pending_to_history,
pending_part::VersionedMap,
},
CommitID, PendingError,
},
Expand Down Expand Up @@ -683,13 +684,14 @@ fn gen_updates(
}

#[allow(clippy::type_complexity)]
fn gen_init(
db: &mut impl DatabaseTrait,
fn gen_init<D: DatabaseTrait>(
db: &D,
num_history: usize,
rng: &mut ChaChaRng,
max_num_new_keys: usize,
max_num_previous_keys: usize,
all_keys: &mut BTreeSet<u64>,
write_schema: &D::WriteSchema,
) -> (
UniqueVec<CommitID>,
Vec<BTreeMap<u64, Option<u64>>>,
Expand Down Expand Up @@ -722,17 +724,8 @@ fn gen_init(

let pending_part = VersionedMap::new(history_cids.items().last().copied(), history_cids.len());

confirm_series_to_history::<_, TestSchema>(
db,
0,
history_cids
.clone()
.into_vec()
.into_iter()
.zip(history_updates.clone())
.collect(),
)
.unwrap();
confirm_ids_to_history::<D>(db, 0, &history_cids.clone().into_vec(), write_schema).unwrap();
confirm_maps_to_history::<D, TestSchema>(db, 0, history_updates.clone(), write_schema).unwrap();

(history_cids, history_updates, pending_part)
}
Expand Down Expand Up @@ -1148,8 +1141,8 @@ impl<'a, 'b, 'c, 'cache, 'db, T: VersionedKeyValueSchema<Key = u64, Value = u64>
}
}

fn test_versioned_store(
db: &mut impl DatabaseTrait,
fn test_versioned_store<D: DatabaseTrait>(
db: &mut D,
num_history: usize,
num_pending: usize,
num_operations: usize,
Expand All @@ -1161,15 +1154,19 @@ fn test_versioned_store(
let mut all_keys = BTreeSet::new();

// init history part
let write_schema = D::write_schema();
let (history_cids, history_updates, mut pending_part) = gen_init(
db,
num_history,
&mut rng,
num_gen_new_keys,
num_gen_previous_keys,
&mut all_keys,
&write_schema,
);
db.commit(write_schema).unwrap();

// build proxy
let mut mock_versioned_store =
MockVersionedStore::build(history_cids.clone(), history_updates.clone());

Expand Down Expand Up @@ -1231,7 +1228,12 @@ fn test_versioned_store(
let mock_res = mock_versioned_store.confirmed_pending_to_history(commit_id);

drop(real_versioned_store);
let real_res = confirmed_pending_to_history(db, &mut pending_part, commit_id);

let write_schema = D::write_schema();
let real_res =
confirmed_pending_to_history(db, &mut pending_part, commit_id, &write_schema);
db.commit(write_schema).unwrap();

real_versioned_store = VersionedStore::new(db, &mut pending_part).unwrap();
real_versioned_store.check_consistency().unwrap();

Expand Down
Loading