Skip to content

Commit

Permalink
tmp commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Johannes Wünsche committed Jun 14, 2023
1 parent db7f32f commit dae7dda
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 30 deletions.
71 changes: 47 additions & 24 deletions betree/pmem-hashmap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
use std::{
ffi::c_int,
hash::{Hash, Hasher},
ptr::NonNull,
};

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -69,9 +70,13 @@ impl PMapError {
/// collisions when operating on the same key.
pub struct PMap {
inner: hashmap_tx_toid,
pobjpool: *mut PMEMobjpool,
pobjpool: NonNull<PMEMobjpool>,
}

/// We can guarantee that no thread-local shaenanigans are done within our
/// library.
unsafe impl Send for PMap {}

impl PMap {
/// Open an existing hashmap. Will fail if no hashmap has been created before.
pub fn open<P: Into<std::path::PathBuf>>(path: P) -> Result<Self, PMapError> {
Expand All @@ -80,10 +85,11 @@ impl PMap {
std::ffi::CString::new(path.into().to_string_lossy().into_owned())?.into_raw();
unsafe { pmemobj_open(path, std::ptr::null()) }
};
if pobjpool.is_null() {
return Err(PMapError::ExternalError(format!("{}", errno::errno())));
if let Some(valid) = NonNull::new(pobjpool) {
Self::new(valid)
} else {
Err(PMapError::ExternalError(format!("{}", errno::errno())))
}
Self::new(pobjpool)
}

/// Create a new hashmap. Will fail if a hashmap already exists at the specified location.
Expand All @@ -96,23 +102,28 @@ impl PMap {
std::ffi::CString::new(path.into().to_string_lossy().into_owned())?.into_raw();
unsafe { pmemobj_create(path, std::ptr::null(), size, 0o666) }
};
if pobjpool.is_null() {
return Err(PMapError::ExternalError(format!("{}", errno::errno())));
if let Some(valid) = NonNull::new(pobjpool) {
Self::new(valid)
} else {
Err(PMapError::ExternalError(format!("{}", errno::errno())))
}
Self::new(pobjpool)
}

/// Initialize or Create a new hashmap. For this we check if the root obj is
/// of the correct type and if map pointer already exists.
///
/// TODO: Use a layout to guarantee compatability?
fn new(pobjpool: *mut PMEMobjpool) -> Result<Self, PMapError> {
let root_obj =
unsafe { access_root(pmemobj_root(pobjpool, std::mem::size_of::<root_toid>())) };
fn new(pobjpool: NonNull<PMEMobjpool>) -> Result<Self, PMapError> {
let root_obj = unsafe {
access_root(pmemobj_root(
pobjpool.as_ptr(),
std::mem::size_of::<root_toid>(),
))
};
if unsafe { root_needs_init(root_obj) != 0 } {
if unsafe {
hm_tx_create(
pobjpool,
pobjpool.as_ptr(),
&mut (*root_obj).map,
std::ptr::null::<std::ffi::c_void>() as *mut std::ffi::c_void,
)
Expand All @@ -121,7 +132,7 @@ impl PMap {
return Err(PMapError::ExternalError(format!("{}", errno::errno())));
}
} else {
unsafe { hm_tx_init(pobjpool, (*root_obj).map) };
unsafe { hm_tx_init(pobjpool.as_ptr(), (*root_obj).map) };
}

Ok(Self {
Expand All @@ -136,8 +147,15 @@ impl PMap {
/// original data in here as well.
pub fn insert<K: Hash>(&mut self, key: K, val: &[u8]) -> Result<(), PMapError> {
let k = self.hash(key);
self.insert_hashed(k, val)
}

/// Raw "pre-hashed" insertion, which skips the first hashing round.
pub fn insert_hashed(&mut self, k: u64, val: &[u8]) -> Result<(), PMapError> {
let mut oid = std::mem::MaybeUninit::<PMEMoid>::uninit();
if unsafe { pmemobj_zalloc(self.pobjpool, oid.as_mut_ptr(), 8 + val.len(), 2) != 0 } {
if unsafe {
pmemobj_zalloc(self.pobjpool.as_ptr(), oid.as_mut_ptr(), 8 + val.len(), 2) != 0
} {
return Err(PMapError::AllocationError(format!("{}", errno::errno())));
}

Expand All @@ -147,28 +165,33 @@ impl PMap {
(*mv).buf.as_mut_slice(val.len()).copy_from_slice(val);
}

let inserted = unsafe { hm_tx_insert(self.pobjpool, self.inner, k, oid.assume_init()) };
let inserted =
unsafe { hm_tx_insert(self.pobjpool.as_ptr(), self.inner, k, oid.assume_init()) };
if inserted != 0 {
return Err(PMapError::from_insertion(inserted));
}
Ok(())
}

/// Remove the specified key from the hashmap.
pub fn remove<K: Hash>(&mut self, key: K) {
pub fn remove<K: Hash>(&mut self, key: K) -> Result<(), PMapError> {
let k = self.hash(key);
self.remove_hashed(k)
}

/// Raw "hashed" removal, which skips the first hashing round.
pub fn remove_hashed(&mut self, k: u64) {
let mut pptr = unsafe { hm_tx_remove(self.pobjpool, self.inner, k) };
/// Raw "pre-hashed" removal, which skips the first hashing round.
pub fn remove_hashed(&mut self, k: u64) -> Result<(), PMapError> {
let mut pptr = unsafe { hm_tx_remove(self.pobjpool.as_ptr(), self.inner, k) };
if pptr.off == 0 {
return Err(PMapError::DoesNotExist);
}
unsafe { pmemobj_free(&mut pptr) };
Ok(())
}

/// Raw "hashed" access, which skips the first hashing round.
/// Raw "pre-hashed" access, which skips the first hashing round.
pub fn get_hashed(&mut self, k: u64) -> Result<&mut [u8], PMapError> {
let val = unsafe { hm_tx_get(self.pobjpool, self.inner, k) };
let val = unsafe { hm_tx_get(self.pobjpool.as_ptr(), self.inner, k) };
if val.off == 0 {
return Err(PMapError::DoesNotExist);
}
Expand All @@ -184,7 +207,7 @@ impl PMap {
}

pub fn len(&mut self) -> usize {
unsafe { hm_tx_count(self.pobjpool, self.inner) }
unsafe { hm_tx_count(self.pobjpool.as_ptr(), self.inner) }
}

pub fn is_empty(&mut self) -> bool {
Expand All @@ -193,11 +216,11 @@ impl PMap {

pub fn lookup<K: Hash>(&mut self, key: K) -> bool {
let k = self.hash(key);
unsafe { hm_tx_lookup(self.pobjpool, self.inner, k) == 1 }
unsafe { hm_tx_lookup(self.pobjpool.as_ptr(), self.inner, k) == 1 }
}

pub fn close(self) {
unsafe { pmemobj_close(self.pobjpool) };
unsafe { pmemobj_close(self.pobjpool.as_ptr()) };
}

pub fn hash<K: Hash>(&self, key: K) -> u64 {
Expand Down Expand Up @@ -299,7 +322,7 @@ mod tests {
let mut pmap = PMap::create(file.path(), 32 * 1024 * 1024).unwrap();
pmap.insert(b"foo", &[1, 2, 3]).unwrap();
assert!(pmap.lookup(b"foo"));
pmap.remove(b"foo");
pmap.remove(b"foo").unwrap();
assert!(!pmap.lookup(b"foo"));
}

Expand Down
15 changes: 10 additions & 5 deletions betree/src/data_management/dmu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use super::{
object_ptr::ObjectPointer,
CopyOnWriteEvent, Dml, HasStoragePreference, Object, ObjectReference,
};
#[cfg(feature = "nvm")]
use crate::replication::PersistentCache;
use crate::{
allocator::{Action, SegmentAllocator, SegmentId},
buffer::Buf,
Expand Down Expand Up @@ -60,6 +62,8 @@ where
next_modified_node_id: AtomicU64,
next_disk_id: AtomicU64,
report_tx: Option<Sender<DmlMsg>>,
#[cfg(feature = "nvm")]
persistent_cache: Mutex<PersistentCache>,
}

impl<E, SPL> Dmu<E, SPL>
Expand All @@ -76,6 +80,7 @@ where
alloc_strategy: [[Option<u8>; NUM_STORAGE_CLASSES]; NUM_STORAGE_CLASSES],
cache: E,
handler: Handler<ObjRef<ObjectPointer<SPL::Checksum>>>,
#[cfg(feature = "nvm")] persistent_cache: PersistentCache,
) -> Self {
let allocation_data = (0..pool.storage_class_count())
.map(|class| {
Expand Down Expand Up @@ -103,6 +108,8 @@ where
next_modified_node_id: AtomicU64::new(1),
next_disk_id: AtomicU64::new(0),
report_tx: None,
#[cfg(feature = "nvm")]
persistent_cache: Mutex::new(persistent_cache),
}
}

Expand Down Expand Up @@ -499,11 +506,7 @@ where
{
warn!(
"Storage tier {class} does not have enough space remaining. {} blocks of {}",
self.handler
.free_space_tier(class)
.unwrap()
.free
.as_u64(),
self.handler.free_space_tier(class).unwrap().free.as_u64(),
size.as_u64()
);
continue;
Expand Down Expand Up @@ -724,6 +727,8 @@ where
if let ObjRef::Unmodified(ref ptr, ref pk) = *or {
drop(cache);

// TODO: Check persistent cache

self.fetch(ptr, pk.clone())?;
if let Some(report_tx) = &self.report_tx {
let _ = report_tx
Expand Down
17 changes: 16 additions & 1 deletion betree/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use crate::{
vdev::Block,
StoragePreference,
};

#[cfg(feature = "nvm")]
use crate::replication::PersistentCache;

use bincode::{deserialize, serialize_into};
use byteorder::{BigEndian, ByteOrder, LittleEndian};
use crossbeam_channel::Sender;
Expand All @@ -31,7 +35,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
collections::HashMap,
iter::FromIterator,
path::Path,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand Down Expand Up @@ -134,6 +138,10 @@ pub struct DatabaseConfiguration {
pub compression: CompressionConfiguration,
/// Size of cache in TODO
pub cache_size: usize,

#[cfg(feature = "nvm")]
pub persistent_cache_path: Option<PathBuf>,

/// Whether to check for and open an existing database, or overwrite it
pub access_mode: AccessMode,

Expand All @@ -156,6 +164,8 @@ impl Default for DatabaseConfiguration {
default_storage_class: 0,
compression: CompressionConfiguration::None,
cache_size: DEFAULT_CACHE_SIZE,
#[cfg(feature = "nvm")]
persistent_cache_path: None,
access_mode: AccessMode::OpenIfExists,
sync_interval_ms: Some(DEFAULT_SYNC_INTERVAL_MS),
metrics: None,
Expand Down Expand Up @@ -226,6 +236,9 @@ impl DatabaseConfiguration {
}
}

#[cfg(feature = "nvm")]
let pcache = PersistentCache::create("foobar", 69420).expect("FIXME");

Dmu::new(
self.compression.to_builder(),
XxHashBuilder,
Expand All @@ -234,6 +247,8 @@ impl DatabaseConfiguration {
strategy,
ClockCache::new(self.cache_size),
handler,
#[cfg(feature = "nvm")]
pcache,
)
}

Expand Down
2 changes: 2 additions & 0 deletions betree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub mod cow_bytes;
pub mod data_management;
pub mod database;
pub mod range_validation;
#[cfg(feature = "nvm")]
pub mod replication;
pub mod size;
pub mod storage_pool;
pub mod tree;
Expand Down
Loading

0 comments on commit dae7dda

Please sign in to comment.