Skip to content

Commit

Permalink
tmp commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Johannes Wünsche committed Jun 30, 2023
1 parent 57ce5b0 commit 905dfdc
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 94 deletions.
2 changes: 1 addition & 1 deletion betree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ criterion = "0.3"
tempfile = "3.6.0"

[features]
default = ["init_env_logger", "figment_config"]
default = ["init_env_logger", "figment_config", "nvm"]

# unlock unstable API for consumption by bectl and other debugging tools
internal-api = []
Expand Down
8 changes: 7 additions & 1 deletion betree/include/betree.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef betree_h
#define betree_h

/* Generated with cbindgen:0.24.3 */
/* Generated with cbindgen:0.24.5 */

/* Warning, this file is autogenerated by cbindgen. Don't modify this manually. */

Expand Down Expand Up @@ -467,6 +467,12 @@ int betree_object_read_at(struct obj_t *obj,
unsigned long *n_read,
struct err_t **err);

/**
* Return the objects size in bytes. If the size could not be determined
* it is assumed the object is zero-sized.
*/
unsigned long betree_object_size(const struct obj_t *obj, struct err_t **err);

/**
* Try to write `buf_len` bytes from `buf` into `obj`, starting at `offset` bytes into the objects
* data.
Expand Down
11 changes: 6 additions & 5 deletions betree/src/c_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,16 +961,17 @@ pub unsafe extern "C" fn betree_object_write_at(
.handle_result(err)
}

/*
/// Return the objects size in bytes.
/// Return the objects size in bytes. If the size could not be determined
/// it is assumed the object is zero-sized.
#[no_mangle]
pub unsafe extern "C" fn betree_object_status(obj: *const obj_t, err: *mut *mut err_t) -> c_ulong {
pub unsafe extern "C" fn betree_object_size(obj: *const obj_t, err: *mut *mut err_t) -> c_ulong {
let obj = &(*obj).0;
let info = obj.info();
obj.
obj.size()
info.and_then(|ok_opt| Ok(ok_opt.map(|obj_info| obj_info.size).unwrap_or(0)))
.unwrap_or(0)
}

/*
/// Returns the last modification timestamp in microseconds since the Unix epoch.
#[no_mangle]
pub unsafe extern "C" fn betree_object_mtime_us(obj: *const obj_t) -> c_ulong {
Expand Down
11 changes: 8 additions & 3 deletions betree/src/checksum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

use crate::size::{Size, StaticSize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{error::Error, fmt, hash::Hasher, iter::once};
use std::{
error::Error,
fmt,
hash::{Hash, Hasher},
iter::once,
};
use twox_hash;

/// A checksum to verify data integrity.
pub trait Checksum:
Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + 'static
Serialize + DeserializeOwned + Size + Clone + Send + Sync + fmt::Debug + Hash + 'static
{
/// Builds a new `Checksum`.
type Builder: Builder<Self>;
Expand Down Expand Up @@ -67,7 +72,7 @@ impl Error for ChecksumError {
/// `XxHash` contains a digest of `xxHash`
/// which is an "extremely fast non-cryptographic hash algorithm"
/// (<https://github.com/Cyan4973/xxHash>)
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct XxHash(u64);

impl StaticSize for XxHash {
Expand Down
91 changes: 84 additions & 7 deletions betree/src/data_management/dmu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use super::{
object_ptr::ObjectPointer,
CopyOnWriteEvent, Dml, HasStoragePreference, Object, ObjectReference,
};
#[cfg(feature = "nvm")]
use crate::replication::PersistentCache;
use crate::{
allocator::{Action, SegmentAllocator, SegmentId},
buffer::Buf,
Expand Down Expand Up @@ -60,6 +62,8 @@ where
next_modified_node_id: AtomicU64,
next_disk_id: AtomicU64,
report_tx: Option<Sender<DmlMsg>>,
#[cfg(feature = "nvm")]
persistent_cache: Option<Arc<Mutex<PersistentCache<DiskOffset, Option<DiskOffset>>>>>,
}

impl<E, SPL> Dmu<E, SPL>
Expand All @@ -76,6 +80,9 @@ where
alloc_strategy: [[Option<u8>; NUM_STORAGE_CLASSES]; NUM_STORAGE_CLASSES],
cache: E,
handler: Handler<ObjRef<ObjectPointer<SPL::Checksum>>>,
#[cfg(feature = "nvm")] persistent_cache: Option<
PersistentCache<DiskOffset, Option<DiskOffset>>,
>,
) -> Self {
let allocation_data = (0..pool.storage_class_count())
.map(|class| {
Expand Down Expand Up @@ -103,6 +110,8 @@ where
next_modified_node_id: AtomicU64::new(1),
next_disk_id: AtomicU64::new(0),
report_tx: None,
#[cfg(feature = "nvm")]
persistent_cache: persistent_cache.map(|cache| Arc::new(Mutex::new(cache))),
}
}

Expand Down Expand Up @@ -226,6 +235,23 @@ where
let offset = op.offset();
let generation = op.generation();

#[cfg(feature = "nvm")]
let compressed_data = {
let mut buf = None;
if let Some(ref pcache_mtx) = self.persistent_cache {
let mut cache = pcache_mtx.lock();
if let Ok(buffer) = cache.get(offset) {
buf = Some(Buf::from_zero_padded(buffer.to_vec()))
}
}
if let Some(b) = buf {
b
} else {
self.pool
.read(op.size(), op.offset(), op.checksum().clone())?
}
};
#[cfg(not(feature = "nvm"))]
let compressed_data = self
.pool
.read(op.size(), op.offset(), op.checksum().clone())?;
Expand Down Expand Up @@ -329,7 +355,30 @@ where

let mid = match key {
ObjectKey::InWriteback(_) => unreachable!(),
ObjectKey::Unmodified { .. } => return Ok(()),
ObjectKey::Unmodified {
offset,
generation: _,
} => {
#[cfg(feature = "nvm")]
if let Some(ref pcache_mtx) = self.persistent_cache {
let mut pcache = pcache_mtx.lock();
// TODO: Specify correct constant instead of magic 🪄
let mut vec = Vec::with_capacity(4 * 1024 * 1024);
object.value_mut().get_mut().pack(&mut vec)?;
let _ = pcache.remove(offset);
pcache
.prepare_insert(offset, &vec, None)
.insert(|maybe_offset, data| {
// TODO: Write eventually not synced data to disk finally.
if let Some(offset) = maybe_offset {
self.pool
.begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?;
}
Ok(())
})?;
}
return Ok(());
}
ObjectKey::Modified(mid) => mid,
};

Expand All @@ -341,6 +390,8 @@ where
drop(cache);
let object = CacheValueRef::write(entry);

// Eviction at this points only writes a singular node as all children
// need to be unmodified beforehand.
self.handle_write_back(object, mid, true, pk)?;
Ok(())
}
Expand Down Expand Up @@ -412,7 +463,37 @@ where
state.finish()
};

self.pool.begin_write(compressed_data, offset)?;
#[cfg(feature = "nvm")]
let skip_write_back = self.persistent_cache.is_some();
#[cfg(not(feature = "nvm"))]
let skip_write_back = false;

if !skip_write_back {
self.pool.begin_write(compressed_data, offset)?;
} else {
let bar = compressed_data.clone();
#[cfg(feature = "nvm")]
if let Some(ref pcache_mtx) = self.persistent_cache {
let away = Arc::clone(pcache_mtx);
self.pool.begin_foo(offset, move || {
let mut pcache = away.lock();
let _ = pcache.remove(offset);
pcache
.prepare_insert(offset, &compressed_data, None)
.insert(|maybe_offset, data| {
// TODO: Deduplicate this?
// if let Some(offset) = maybe_offset {
// self.pool
// .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?;
// }
panic!("This should not have happnened in the debug run!");
Ok(())
})
.unwrap();
})?;
}
self.pool.begin_write(bar, offset)?;
}

let obj_ptr = ObjectPointer {
offset,
Expand Down Expand Up @@ -499,11 +580,7 @@ where
{
warn!(
"Storage tier {class} does not have enough space remaining. {} blocks of {}",
self.handler
.free_space_tier(class)
.unwrap()
.free
.as_u64(),
self.handler.free_space_tier(class).unwrap().free.as_u64(),
size.as_u64()
);
continue;
Expand Down
10 changes: 9 additions & 1 deletion betree/src/data_management/errors.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![allow(missing_docs, unused_doc_comments)]
use crate::{storage_pool::DiskOffset, vdev::Block};
#[cfg(feature = "nvm")]
use pmem_hashmap::PMapError;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum Error {
#[error("The storage pool encountered an error.")]
#[error("The storage pool encountered an error. `{source}`")]
VdevError {
#[from]
source: crate::vdev::Error,
Expand Down Expand Up @@ -33,6 +35,12 @@ pub enum Error {
CallbackError,
#[error("A raw allocation has failed.")]
RawAllocationError { at: DiskOffset, size: Block<u32> },
#[cfg(feature = "nvm")]
#[error("A error occured while accessing the persistent cache. `{source}`")]
PersistentCacheError {
#[from]
source: PMapError,
},
}

// To avoid recursive error types here, define a simple translation from
Expand Down
2 changes: 1 addition & 1 deletion betree/src/data_management/object_ptr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Hash)]
/// A pointer to an on-disk serialized object.
pub struct ObjectPointer<D> {
pub(super) decompression_tag: DecompressionTag,
Expand Down
4 changes: 2 additions & 2 deletions betree/src/database/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub enum Error {
#[from]
source: crate::storage_pool::Error,
},
#[error("A tree operation encountered an error. This is likely an internal error.")]
#[error("A tree operation encountered an error. This is likely an internal error. `{source}`")]
TreeError {
#[from]
source: crate::tree::Error,
Expand Down Expand Up @@ -56,7 +56,7 @@ pub enum Error {
InUse,
#[error("Message surpasses the maximum length. If you cannot shrink your value, use an object store instead.")]
MessageTooLarge,
#[error("Could not serialize the given data. This is an internal error.")]
#[error("Could not serialize the given data. This is an internal error. `{source}`")]
SerializeFailed {
#[from]
source: serde_json::Error,
Expand Down
14 changes: 13 additions & 1 deletion betree/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use crate::{
vdev::Block,
StoragePreference,
};

#[cfg(feature = "nvm")]
use crate::replication::PersistentCache;

use bincode::{deserialize, serialize_into};
use byteorder::{BigEndian, ByteOrder, LittleEndian};
use crossbeam_channel::Sender;
Expand Down Expand Up @@ -162,7 +166,7 @@ impl Default for DatabaseConfiguration {
compression: CompressionConfiguration::None,
cache_size: DEFAULT_CACHE_SIZE,
#[cfg(feature = "nvm")]
persistent_cache_path: None,
persistent_cache: None,
access_mode: AccessMode::OpenIfExists,
sync_interval_ms: Some(DEFAULT_SYNC_INTERVAL_MS),
metrics: None,
Expand Down Expand Up @@ -233,6 +237,12 @@ impl DatabaseConfiguration {
}
}

#[cfg(feature = "nvm")]
let pcache = self.persistent_cache.as_ref().map(|config| {
PersistentCache::create(&config.path, config.bytes)
.unwrap_or_else(|_| PersistentCache::open(&config.path).unwrap())
});

Dmu::new(
self.compression.to_builder(),
XxHashBuilder,
Expand All @@ -241,6 +251,8 @@ impl DatabaseConfiguration {
strategy,
ClockCache::new(self.cache_size),
handler,
#[cfg(feature = "nvm")]
pcache,
)
}

Expand Down
Loading

0 comments on commit 905dfdc

Please sign in to comment.