Skip to content

Commit

Permalink
Added the actual implementation for the AtomicView::latest_view (#1994
Browse files Browse the repository at this point in the history
)

Closes #1581

The change finally implements an atomic view of the RocksDB database via
the snapshot feature.

We use a snapshot (if available) each time we read/iterate over the
RocksDB. It guarantees a consistent view even if the data is
removed/added/modified.

`InMemory` database also implements this feature by simply cloning all
`BTreeMap`s. In the future, we need to optimize this behavior and cache
views - #1993.

## Checklist
- [x] New behavior is reflected in tests

### Before requesting review
- [x] I have reviewed the code myself
- [x] I have created follow-up issues caused by this PR and linked them
here
  • Loading branch information
xgreenx authored Jun 28, 2024
1 parent dfe39ae commit ca47981
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased]

### Added
- [#1994](https://github.com/FuelLabs/fuel-core/pull/1994): Added the actual implementation for the `AtomicView::latest_view`.
- [#1972](https://github.com/FuelLabs/fuel-core/pull/1972): Implement `AlgorithmUpdater` for `GasPriceService`
- [#1948](https://github.com/FuelLabs/fuel-core/pull/1948): Add new `AlgorithmV1` and `AlgorithmUpdaterV1` for the gas price. Include tools for analysis

Expand Down
6 changes: 1 addition & 5 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::{
},
generic_database::GenericDatabase,
in_memory::memory_store::MemoryStore,
iterable_key_value_view::IterableKeyValueViewWrapper,
key_value_view::KeyValueViewWrapper,
ChangesIterator,
ColumnType,
Expand Down Expand Up @@ -249,10 +248,7 @@ where
type LatestView = IterableKeyValueView<ColumnType<Description>>;

fn latest_view(&self) -> StorageResult<Self::LatestView> {
// TODO: https://github.com/FuelLabs/fuel-core/issues/1581
Ok(IterableKeyValueView::from_storage(
IterableKeyValueViewWrapper::new(self.clone()),
))
self.inner_storage().data.latest_view()
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ impl<'a> ReadViewProvider for Context<'a> {
fn read_view(&self) -> StorageResult<Cow<'a, ReadView>> {
let operation_type = self.query_env.operation.node.ty;

// Sometimes, during mutable queries the resolvers
// Sometimes, during mutable queries or subscription the resolvers
// need access to an updated view of the database.
if operation_type == OperationType::Mutation {
if operation_type != OperationType::Query {
let database: &ReadDatabase = self.data_unchecked();
database.view().map(Cow::Owned)
} else {
Expand Down
3 changes: 1 addition & 2 deletions crates/fuel-core/src/schema/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ impl BlockMutation {
start_timestamp: Option<Tai64Timestamp>,
blocks_to_produce: U32,
) -> async_graphql::Result<U32> {
let query = ctx.read_view()?;
let consensus_module = ctx.data_unchecked::<ConsensusModule>();
let config = ctx.data_unchecked::<GraphQLConfig>().clone();

Expand All @@ -388,7 +387,7 @@ impl BlockMutation {
.manually_produce_blocks(start_time, blocks_to_produce)
.await?;

query
ctx.read_view()?
.latest_block_height()
.map(Into::into)
.map_err(Into::into)
Expand Down
5 changes: 5 additions & 0 deletions crates/fuel-core/src/service/genesis/importer/import_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ mod tests {
},
state::{
in_memory::memory_store::MemoryStore,
IterableKeyValueView,
TransactableStorage,
},
};
Expand Down Expand Up @@ -561,6 +562,10 @@ mod tests {
) -> StorageResult<()> {
Err(anyhow::anyhow!("I refuse to work!").into())
}

fn latest_view(&self) -> StorageResult<IterableKeyValueView<Self::Column>> {
Err(anyhow::anyhow!("I refuse to work!").into())
}
}

#[test]
Expand Down
6 changes: 6 additions & 0 deletions crates/fuel-core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub trait TransactableStorage<Height>: IterableStore + Debug + Send + Sync {
height: Option<Height>,
changes: Changes,
) -> StorageResult<()>;

fn latest_view(&self) -> StorageResult<IterableKeyValueView<Self::Column>>;
}

// It is used only to allow conversion of the `StorageTransaction` into the `DataSource`.
Expand All @@ -73,6 +75,10 @@ where
fn commit_changes(&self, _: Option<Height>, _: Changes) -> StorageResult<()> {
unimplemented!()
}

fn latest_view(&self) -> StorageResult<IterableKeyValueView<Self::Column>> {
unimplemented!()
}
}

/// A type that allows to iterate over the `Changes`.
Expand Down
4 changes: 4 additions & 0 deletions crates/fuel-core/src/state/generic_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ pub struct GenericDatabase<Storage> {
}

impl<Storage> GenericDatabase<Storage> {
pub fn inner_storage(&self) -> &Storage {
self.storage.as_ref()
}

pub fn from_storage(storage: Storage) -> Self {
Self {
storage: StructuredStorage::new(storage),
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/state/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
// TODO: Move the implementation from the module to here.
pub mod memory_store;
pub mod memory_view;
33 changes: 32 additions & 1 deletion crates/fuel-core/src/state/in_memory/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use crate::{
DatabaseDescription,
},
state::{
in_memory::memory_view::MemoryView,
iterable_key_value_view::IterableKeyValueViewWrapper,
IterDirection,
IterableKeyValueView,
TransactableStorage,
},
};
Expand All @@ -31,7 +34,11 @@ use fuel_core_storage::{
use std::{
collections::BTreeMap,
fmt::Debug,
sync::Mutex,
ops::Deref,
sync::{
Arc,
Mutex,
},
};

#[derive(Debug)]
Expand Down Expand Up @@ -62,6 +69,23 @@ impl<Description> MemoryStore<Description>
where
Description: DatabaseDescription,
{
fn create_view(&self) -> MemoryView<Description> {
// Lock all tables at the same time to have consistent view.
let locks = self
.inner
.iter()
.map(|lock| lock.lock().expect("Poisoned lock"))
.collect::<Vec<_>>();
let inner = locks
.iter()
.map(|btree| btree.deref().clone())
.collect::<Vec<_>>();
MemoryView {
inner,
_marker: Default::default(),
}
}

pub fn iter_all(
&self,
column: Description::Column,
Expand Down Expand Up @@ -136,6 +160,13 @@ where
}
Ok(())
}

fn latest_view(&self) -> StorageResult<IterableKeyValueView<Self::Column>> {
let view = self.create_view();
Ok(IterableKeyValueView::from_storage(
IterableKeyValueViewWrapper::new(Arc::new(view)),
))
}
}

#[cfg(test)]
Expand Down
76 changes: 76 additions & 0 deletions crates/fuel-core/src/state/in_memory/memory_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use crate::database::database_description::{
on_chain::OnChain,
DatabaseDescription,
};
use fuel_core_storage::{
iter::{
iterator,
BoxedIter,
IntoBoxedIter,
IterDirection,
IterableStore,
},
kv_store::{
KVItem,
KeyValueInspect,
StorageColumn,
Value,
},
transactional::ReferenceBytesKey,
Result as StorageResult,
};
use std::collections::BTreeMap;

#[derive(Debug, Clone)]
pub struct MemoryView<Description = OnChain>
where
Description: DatabaseDescription,
{
pub(crate) inner: Vec<BTreeMap<ReferenceBytesKey, Value>>,
pub(crate) _marker: core::marker::PhantomData<Description>,
}

impl<Description> MemoryView<Description>
where
Description: DatabaseDescription,
{
pub fn iter_all<'a>(
&'a self,
column: Description::Column,
prefix: Option<&[u8]>,
start: Option<&[u8]>,
direction: IterDirection,
) -> impl Iterator<Item = KVItem> + 'a {
let btree = &self.inner[column.as_usize()];

iterator(btree, prefix, start, direction)
.map(|(key, value)| (key.clone().into(), value.clone()))
.map(Ok)
}
}

impl<Description> KeyValueInspect for MemoryView<Description>
where
Description: DatabaseDescription,
{
type Column = Description::Column;

fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
Ok(self.inner[column.as_usize()].get(key).cloned())
}
}

impl<Description> IterableStore for MemoryView<Description>
where
Description: DatabaseDescription,
{
fn iter_store(
&self,
column: Self::Column,
prefix: Option<&[u8]>,
start: Option<&[u8]>,
direction: IterDirection,
) -> BoxedIter<KVItem> {
self.iter_all(column, prefix, start, direction).into_boxed()
}
}
Loading

0 comments on commit ca47981

Please sign in to comment.