Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NVM-aware Bepsilon-tree #46

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jobs:
dependencies:
name: Dependencies
runs-on: ubuntu-22.04
timeout-minutes: 60
timeout-minutes: 180
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down Expand Up @@ -40,7 +40,7 @@ jobs:
name: Integration Tests
needs: dependencies
runs-on: ubuntu-22.04
timeout-minutes: 60
timeout-minutes: 180
env:
RUST_BACKTRACE: 1
steps:
Expand Down Expand Up @@ -78,7 +78,7 @@ jobs:
name: Unit Tests
needs: dependencies
runs-on: ubuntu-22.04
timeout-minutes: 60
timeout-minutes: 180
env:
RUST_BACKTRACE: 1
steps:
Expand Down Expand Up @@ -115,7 +115,7 @@ jobs:
betree-msrv:
name: MSRV Check
runs-on: ubuntu-22.04
timeout-minutes: 60
timeout-minutes: 180
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down Expand Up @@ -161,7 +161,7 @@ jobs:
fio-haura:
name: fio ioengine for Haura
runs-on: ubuntu-22.04
timeout-minutes: 60
timeout-minutes: 180
needs: dependencies
steps:
- name: Checkout
Expand Down
7 changes: 7 additions & 0 deletions betree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ rand = { version = "0.8", features = ["std_rng"] }

pmdk = { path = "./pmdk", optional = true }

rkyv = { version = "0.7.42", features = ["validation"] }
bytecheck = { version = "0.7.0" }
extend = { version = "1.2.0" }

chrono = "0.4"

lazy_static = "1.4"
[dev-dependencies]
rand_xorshift = "0.3"
quickcheck = "1"
Expand Down
10 changes: 10 additions & 0 deletions betree/pmdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

include!(concat!(env!("OUT_DIR"), "/bindings.rs"));

use std::slice;
use std::os::raw::c_void;

#[derive(Debug)]
Expand Down Expand Up @@ -73,6 +74,15 @@ impl PMem {
Ok(())
}

pub unsafe fn get_slice(&self, offset: usize, len: usize) -> Result<&'static [u8], std::io::Error>{
if self.ptr.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::Other,
format!("File handle is missing for the PMEM file.")));
}

Ok(slice::from_raw_parts(voidp_to_ref::<u8>(self.ptr.add(offset)), len))
}

pub unsafe fn write(&self, offset: usize, data: &[u8], len: usize) -> Result<(), std::io::Error>{
if self.ptr.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::Other,
Expand Down
20 changes: 15 additions & 5 deletions betree/src/checksum.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
//! This module provides a `Checksum` trait for verifying data integrity.

use crate::size::{Size, StaticSize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
//use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{error::Error, fmt, hash::Hasher, iter::once};
use twox_hash;

use rkyv::{
archived_root,
ser::{serializers::AllocSerializer, ScratchSpace, Serializer},
vec::{ArchivedVec, VecResolver},
with::{ArchiveWith, DeserializeWith, SerializeWith},
Archive, Archived, Deserialize, Fallible, Infallible, Serialize, AlignedVec,
};


/// A checksum to verify data integrity.
pub trait Checksum:
Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static
serde::Serialize + serde::de::DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static
{
/// Builds a new `Checksum`.
type Builder: Builder<Self>;
Expand All @@ -27,7 +36,7 @@ pub trait Checksum:

/// A checksum builder
pub trait Builder<C: Checksum>:
Serialize + DeserializeOwned + Clone + Send + Sync + fmt::Debug + 'static
serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync + fmt::Debug + 'static
{
/// The internal state of the checksum.
type State: State<Checksum = C>;
Expand Down Expand Up @@ -67,7 +76,8 @@ impl Error for ChecksumError {
/// `XxHash` contains a digest of `xxHash`
/// which is an "extremely fast non-cryptographic hash algorithm"
/// (<https://github.com/Cyan4973/xxHash>)
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq, Eq, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub struct XxHash(u64);

impl StaticSize for XxHash {
Expand Down Expand Up @@ -97,7 +107,7 @@ impl Checksum for XxHash {
}

/// The corresponding `Builder` for `XxHash`.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct XxHashBuilder;

impl Builder<XxHash> for XxHashBuilder {
Expand Down
3 changes: 2 additions & 1 deletion betree/src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ impl CompressionConfiguration {
/// method. This differs from a CompressionConfiguration, in that it is not configurable, as
/// all methods will decompress just fine without knowing at which compression level it was
/// originally written, so there's no advantage in storing the compression level with each object.
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
#[repr(u8)]
pub enum DecompressionTag {
None,
Expand Down
12 changes: 10 additions & 2 deletions betree/src/cow_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@ use std::{

/// Copy-on-Write smart pointer which supports cheap cloning as it is
/// reference-counted.
#[derive(Hash, Debug, Clone, Eq, Ord, Default)]
#[derive(Hash, Debug, Clone, Eq, Ord, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub struct CowBytes {
// TODO Replace by own implementation
pub(super) inner: Arc<Vec<u8>>,
}

impl AsRef<[u8]> for ArchivedCowBytes {
fn as_ref(&self) -> &[u8] {
&self.inner
}
}

impl<T: AsRef<[u8]>> PartialEq<T> for CowBytes {
fn eq(&self, other: &T) -> bool {
&**self == other.as_ref()
Expand Down Expand Up @@ -219,7 +226,8 @@ impl<'a> Extend<&'a u8> for CowBytes {
}

/// Reference-counted pointer which points to a subslice of the referenced data.
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
#[archive(check_bytes)]
pub struct SlicedCowBytes {
pub(super) data: CowBytes,
pos: u32,
Expand Down
31 changes: 26 additions & 5 deletions betree/src/data_management/dmu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl<E, SPL> Dmu<E, SPL>
where
SPL: StoragePoolLayer,
SPL::Checksum: StaticSize,
crate::checksum::XxHash: From<<SPL as StoragePoolLayer>::Checksum>
{
/// Returns a new `Dmu`.
pub fn new(
Expand Down Expand Up @@ -130,6 +131,8 @@ where
>,
SPL: StoragePoolLayer,
SPL::Checksum: StaticSize,
crate::storage_pool::StoragePoolUnit<crate::checksum::XxHash>: From<SPL>,
crate::checksum::XxHash: From<<SPL as StoragePoolLayer>::Checksum>
{
/// Stealing an [ObjectRef] can have multiple effects. First, the
/// corresponding node is moved in cache to the [ObjectKey::Modified] state.
Expand Down Expand Up @@ -226,13 +229,20 @@ where
let offset = op.offset();
let generation = op.generation();

// TODO: Karim.. add comments
let mut bytes_to_read = op.size();
let meta_data_len = 0;
if (meta_data_len != 0) {
bytes_to_read = Block::round_up_from_bytes(meta_data_len as u32);
}

let compressed_data = self
.pool
.read(op.size(), op.offset(), op.checksum().clone())?;
.read(bytes_to_read, op.offset(), op.checksum().clone())?;

let object: Node<ObjRef<ObjectPointer<SPL::Checksum>>> = {
let data = decompression_state.decompress(&compressed_data)?;
Object::unpack_at(op.offset(), op.info(), data)?
Object::unpack_at(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), data)?
};
let key = ObjectKey::Unmodified { offset, generation };
self.insert_object_into_cache(key, TaggedCacheValue::new(RwLock::new(object), pivot_key));
Expand Down Expand Up @@ -380,12 +390,14 @@ where
.preferred_class()
.unwrap_or(self.default_storage_class);

// TODO: Karim.. add comments
let mut metadata_size = 0;
let compression = &self.default_compression;
let compressed_data = {
// FIXME: cache this
let mut state = compression.new_compression()?;
{
object.pack(&mut state)?;
object.pack(&mut state, &mut metadata_size)?;
drop(object);
}
state.finish()
Expand Down Expand Up @@ -421,6 +433,7 @@ where
decompression_tag: compression.decompression_tag(),
generation,
info,
metadata_size,
};

let was_present;
Expand Down Expand Up @@ -525,7 +538,7 @@ where
})
.unwrap();
let size = self.pool.actual_size(class, disk_id, size);
let disk_size = self.pool.size_in_blocks(class, disk_id);
let disk_size: Block<u64> = self.pool.size_in_blocks(class, disk_id);

let disk_offset = {
let mut x = self.allocation_data[class as usize][disk_id as usize].lock();
Expand Down Expand Up @@ -681,6 +694,8 @@ where
>,
SPL: StoragePoolLayer,
SPL::Checksum: StaticSize,
crate::storage_pool::StoragePoolUnit<crate::checksum::XxHash>: From<SPL>,
crate::checksum::XxHash: From<<SPL as StoragePoolLayer>::Checksum>
{
type ObjectPointer = ObjectPointer<SPL::Checksum>;
type ObjectRef = ObjRef<Self::ObjectPointer>;
Expand Down Expand Up @@ -940,7 +955,7 @@ where
.decompression_tag()
.new_decompression()?
.decompress(&compressed_data)?;
Object::unpack_at(ptr.offset(), ptr.info(), data)?
Object::unpack_at(ptr.size(), ptr.checksum().clone().into() , self.pool.clone().into(), ptr.offset(), ptr.info(), data)?
};
let key = ObjectKey::Unmodified {
offset: ptr.offset(),
Expand Down Expand Up @@ -983,6 +998,8 @@ where
>,
SPL: StoragePoolLayer,
SPL::Checksum: StaticSize,
crate::storage_pool::StoragePoolUnit<crate::checksum::XxHash>: From<SPL>,
crate::checksum::XxHash: From<<SPL as StoragePoolLayer>::Checksum>
{
type Handler = Handler<ObjRef<ObjectPointer<SPL::Checksum>>>;

Expand All @@ -999,6 +1016,8 @@ where
>,
SPL: StoragePoolLayer,
SPL::Checksum: StaticSize,
crate::storage_pool::StoragePoolUnit<crate::checksum::XxHash>: From<SPL>,
crate::checksum::XxHash: From<<SPL as StoragePoolLayer>::Checksum>
{
fn storage_hints(&self) -> Arc<Mutex<HashMap<PivotKey, StoragePreference>>> {
Arc::clone(&self.storage_hints)
Expand All @@ -1017,6 +1036,8 @@ where
>,
SPL: StoragePoolLayer,
SPL::Checksum: StaticSize,
crate::storage_pool::StoragePoolUnit<crate::checksum::XxHash>: From<SPL>,
crate::checksum::XxHash: From<<SPL as StoragePoolLayer>::Checksum>
{
fn with_report(mut self, tx: Sender<DmlMsg>) -> Self {
self.report_tx = Some(tx);
Expand Down
62 changes: 55 additions & 7 deletions betree/src/data_management/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use crate::{
StoragePreference,
};
use serde::{
de::DeserializeOwned, ser::Error as SerError, Deserialize, Deserializer, Serialize, Serializer,
de::DeserializeOwned, ser::Error as SerError,
};

use rkyv::ser::Serializer;

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct ModifiedObjectId {
pub(super) id: u64,
Expand Down Expand Up @@ -41,7 +43,7 @@ pub enum ObjRef<P> {
impl<D> super::ObjectReference for ObjRef<ObjectPointer<D>>
where
D: std::fmt::Debug + 'static,
ObjectPointer<D>: Serialize + DeserializeOwned + StaticSize + Clone,
ObjectPointer<D>: serde::Serialize + DeserializeOwned + StaticSize + Clone,
{
type ObjectPointer = ObjectPointer<D>;
fn get_unmodified(&self) -> Option<&ObjectPointer<D>> {
Expand Down Expand Up @@ -72,6 +74,52 @@ where
ObjRef::Unmodified(_, pk) | ObjRef::Modified(_, pk) | ObjRef::InWriteback(_, pk) => pk,
}
}

// TODO: Karim.. add comments
fn serialize_unmodified(&self, w : &mut Vec<u8>) -> Result<(), std::io::Error> {
//panic!("serialize_unmodified ..........");
// if let ObjRef::Unmodified(ref p, ..) | ObjRef::Incomplete(ref p) = self {
// bincode::serialize_into(w, p)
// .map_err(|e| {
// debug!("Failed to serialize ObjectPointer.");
// std::io::Error::new(std::io::ErrorKind::InvalidData, e)
// })?;
// }
match *self {
ObjRef::Modified(..) =>
std::io::Error::new(std::io::ErrorKind::Other,
format!("ObjectRef: Tried to serialize a modified ObjectRef")),
ObjRef::InWriteback(..) =>
std::io::Error::new(std::io::ErrorKind::Other,
format!("Tried to serialize a modified ObjectRef which is currently written back")),
ObjRef::Incomplete(..) =>
std::io::Error::new(std::io::ErrorKind::Other,
format!("ObjRef: Tried to serialize incomple reference.")),
ObjRef::Unmodified(ref ptr, ..) => {
bincode::serialize_into(w, ptr)
.map_err(|e| {
debug!("Failed to serialize ObjectPointer.");
std::io::Error::new(std::io::ErrorKind::InvalidData, e)
})?;
return Ok(());
}
// std::io::Error::new(std::io::ErrorKind::Other,
// format!("ObjRef: Tried to serialize incomple reference.")),
};

Ok(())
}

// TODO: Karim.. add comments
fn deserialize_and_set_unmodified(bytes: &[u8]) -> Result<Self, std::io::Error> {
match bincode::deserialize::<ObjectPointer<D>>(bytes) {
Ok(p) => Ok(ObjRef::Incomplete(p)),
Err(e) => {
debug!("Failed to deserialize ObjectPointer.");
Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)
)},
}
}
}

impl<D> ObjRef<ObjectPointer<D>> {
Expand Down Expand Up @@ -129,10 +177,10 @@ impl<P: StaticSize> StaticSize for ObjRef<P> {
}
}

impl<P: Serialize> Serialize for ObjRef<P> {
impl<P: serde::Serialize> serde::Serialize for ObjRef<P> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
S: serde::Serializer,
{
match *self {
ObjRef::Modified(..) => Err(S::Error::custom(
Expand All @@ -148,13 +196,13 @@ impl<P: Serialize> Serialize for ObjRef<P> {
}
}

impl<'de, D> Deserialize<'de> for ObjRef<ObjectPointer<D>>
impl<'de, D> serde::Deserialize<'de> for ObjRef<ObjectPointer<D>>
where
ObjectPointer<D>: Deserialize<'de>,
ObjectPointer<D>: serde::Deserialize<'de>,
{
fn deserialize<E>(deserializer: E) -> Result<Self, E::Error>
where
E: Deserializer<'de>,
E: serde::Deserializer<'de>,
{
ObjectPointer::<D>::deserialize(deserializer).map(ObjRef::Incomplete)
}
Expand Down
Loading