Skip to content

Commit

Permalink
tmp commit 4
Browse files Browse the repository at this point in the history
  • Loading branch information
Johannes Wünsche committed Jul 17, 2023
1 parent 4d5ea59 commit f883a59
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 200 deletions.
103 changes: 64 additions & 39 deletions betree/pmem-hashmap/src/allocator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::*;
use errno::errno;
use std::alloc::{AllocError, Allocator};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::{ffi::c_void, ptr::NonNull, sync::Arc};
use thiserror::Error;

Expand Down Expand Up @@ -32,11 +34,11 @@ impl Into<AllocError> for PalError {

unsafe impl Allocator for Pal {
fn allocate(&self, layout: std::alloc::Layout) -> Result<NonNull<[u8]>, AllocError> {
let ptr = self.allocate(layout.size()).map_err(|_| AllocError)?;
Ok(NonNull::new(unsafe {
core::slice::from_raw_parts_mut(ptr.load() as *mut u8, layout.size())
})
.ok_or_else(|| AllocError)?)
let mut ptr = self.allocate(layout.size()).map_err(|_| AllocError)?;
Ok(
NonNull::new(unsafe { core::slice::from_raw_parts_mut(ptr.load_mut(), layout.size()) })
.ok_or_else(|| AllocError)?,
)
}

unsafe fn deallocate(&self, ptr: NonNull<u8>, _layout: std::alloc::Layout) {
Expand Down Expand Up @@ -64,12 +66,41 @@ pub enum PalError {

// A friendly persistent pointer. Useless without the according handle to the
// original arena.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PalPtr {
pub struct PalPtr<T> {
inner: PMEMoid,
size: usize,
_phantom: PhantomData<T>,
}

impl<T> PartialEq for PalPtr<T> {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner && self.size == other.size
}
}

impl<T> Eq for PalPtr<T> {}

impl<T> Debug for PalPtr<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PalPtr")
.field("inner", &self.inner)
.field("size", &self.size)
.finish()
}
}

impl<T> Clone for PalPtr<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
size: self.size.clone(),
_phantom: self._phantom.clone(),
}
}
}

impl<T> Copy for PalPtr<T> {}

impl PartialEq for PMEMoid {
fn eq(&self, other: &Self) -> bool {
self.pool_uuid_lo == other.pool_uuid_lo && self.off == other.off
Expand All @@ -78,16 +109,18 @@ impl PartialEq for PMEMoid {

impl Eq for PMEMoid {}

impl Drop for PalPtr {
fn drop(&mut self) {
// self.free()
impl<T> PalPtr<T> {
/// Translate this persistent ptr to a volatile one.
pub fn load(&self) -> &T {
unsafe { (haura_direct(self.inner) as *const T).as_ref().unwrap() }
}
}

impl PalPtr {
/// Translate this persistent ptr to a volatile one.
pub fn load(&self) -> *mut c_void {
unsafe { haura_direct(self.inner) }
pub fn load_mut(&mut self) -> &mut T {
unsafe { (haura_direct(self.inner) as *mut T).as_mut().unwrap() }
}

pub fn init(&mut self, src: *const T, count: usize) {
unsafe { (haura_direct(self.inner) as *mut T).copy_from(src, count) }
}

/// Copy a range of bytes behind this pointer to a given buffer. Data is
Expand All @@ -98,7 +131,7 @@ impl PalPtr {
pmemobj_memcpy(
arena.pool.as_ptr(),
other.as_mut_ptr() as *mut c_void,
self.load(),
haura_direct(self.inner),
self.size.min(other.len()),
PMEMOBJ_F_MEM_NOFLUSH,
)
Expand All @@ -111,7 +144,7 @@ impl PalPtr {
unsafe {
pmemobj_memcpy(
arena.pool.as_ptr(),
self.load(),
haura_direct(self.inner),
other.as_ptr() as *const c_void,
self.size.min(other.len()),
PMEMOBJ_F_MEM_NONTEMPORAL,
Expand Down Expand Up @@ -177,7 +210,7 @@ impl Pal {

/// Allocate an area of size in the persistent memory. Allocations are
/// always guaranteed to be cache line aligned for Optane PMem (64 bytes).
pub fn allocate(&self, size: usize) -> Result<PalPtr, PalError> {
pub fn allocate<T>(&self, size: usize) -> Result<PalPtr<T>, PalError> {
let mut oid = std::mem::MaybeUninit::<PMEMoid>::uninit();
if unsafe {
haura_alloc(
Expand All @@ -202,6 +235,7 @@ impl Pal {
Ok(PalPtr {
inner: unsafe { oid.assume_init() },
size,
_phantom: PhantomData {},
})
}

Expand All @@ -211,12 +245,16 @@ impl Pal {
///
/// If called with size 0 an existing root object might be opened, if none
/// exists EINVAL is returned.
pub fn root(&self, size: usize) -> Result<PalPtr, PalError> {
pub fn root<T>(&self, size: usize) -> Result<PalPtr<T>, PalError> {
let oid = unsafe { pmemobj_root(self.pool.as_ptr(), size) };
if oid_is_null(oid) {
return Err(PalError::AllocationFailed(format!("{}", errno())));
}
Ok(PalPtr { inner: oid, size })
Ok(PalPtr {
inner: oid,
size,
_phantom: PhantomData {},
})
}

/// Return the maximum size of the current root object.
Expand Down Expand Up @@ -284,43 +322,30 @@ mod tests {
let file = TestFile::new();
{
let mut pal = Pal::create(file.path(), 128 * 1024 * 1024, 0o666).unwrap();
let mut map: BTreeMap<u8, u8, Pal> = BTreeMap::new_in(pal.clone());
let root_ptr = pal
let map: BTreeMap<u8, u8, Pal> = BTreeMap::new_in(pal.clone());
let mut root_ptr: PalPtr<BTreeMap<u8, u8, Pal>> = pal
.root(std::mem::size_of::<BTreeMap<u8, u8, Pal>>())
.unwrap();
unsafe {
(root_ptr.load() as *mut BTreeMap<u8, u8, Pal>)
.copy_from(&map, std::mem::size_of::<BTreeMap<u8, u8, Pal>>())
};
root_ptr.init(&map, std::mem::size_of::<BTreeMap<u8, u8, Pal>>());
std::mem::forget(map);
let map: &mut BTreeMap<u8, u8, Pal> = unsafe {
(root_ptr.load() as *mut BTreeMap<u8, u8, Pal>)
.as_mut()
.unwrap()
};
let map: &mut BTreeMap<u8, u8, Pal> = root_ptr.load_mut();
for id in 0..100 {
map.insert(id, id);
}
for id in 100..0 {
assert_eq!(map.get(&id), Some(&id));
}
std::mem::forget(map);
pal.close();
}
{
let mut pal = Pal::open(file.path()).unwrap();
let root_ptr = pal
let mut root_ptr = pal
.root(std::mem::size_of::<BTreeMap<u8, u8, Pal>>())
.unwrap();
let map: &mut BTreeMap<u8, u8, Pal> = unsafe {
(root_ptr.load() as *mut BTreeMap<u8, u8, Pal>)
.as_mut()
.unwrap()
};
let map: &mut BTreeMap<u8, u8, Pal> = root_ptr.load_mut();
for id in 100..0 {
assert_eq!(map.get(&id), Some(&id));
}
std::mem::forget(map);
pal.close();
}
}
Expand Down
4 changes: 2 additions & 2 deletions betree/pmem-hashmap/src/bin/bench_pal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ fn bench_pal_sub(rank: usize, barrier: Arc<std::sync::Barrier>, tx: std::sync::m
Arc::new(Pal::create(format!("/home/wuensche/pmem/foobar{rank}"), SIZE, 0o666).unwrap());

enum CMD {
Read(Vec<PalPtr>),
Write(Vec<PalPtr>),
Read(Vec<PalPtr<u8>>),
Write(Vec<PalPtr<u8>>),
Wait,
}

Expand Down
4 changes: 2 additions & 2 deletions betree/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,12 @@ impl Buf {
/// be done with "normal" rust allocations. Therefore, only the length of
/// the ptr is checked *not* the actual location. These are guaranteed to
/// lie at 64 byte cache line boundaries.
pub fn from_persistent_ptr(ptr: PalPtr, size: u32) -> Self {
pub fn from_persistent_ptr(mut ptr: PalPtr<u8>, size: u32) -> Self {
assert_eq!(size as usize % BLOCK_SIZE, 0);
let padded_size = Block::round_up_from_bytes(size);
let aligned_buf = AlignedBuf {
buf: Arc::new(UnsafeCell::new(AlignedStorage {
ptr: ptr.load() as *mut u8,
ptr: std::ptr::addr_of_mut!(*ptr.load_mut()),
capacity: padded_size,
is_persistent: true,
})),
Expand Down
76 changes: 49 additions & 27 deletions betree/src/data_management/dmu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,19 @@ where
let obj = CacheValueRef::write(entry);

if let ObjRef::Unmodified(ptr, ..) = replace(or, ObjRef::Modified(mid, pk)) {
// Deallocate old-region and remove from cache
if let Some(pcache_mtx) = &self.persistent_cache {
let pcache = pcache_mtx.read();
let res = pcache.get(ptr.offset().clone()).is_ok();
drop(pcache);
if res {
// FIXME: Offload this lock to a different thread
// This operation is only tangantely important for the
// operation here and not time critical.
let mut pcache = pcache_mtx.write();
pcache.remove(ptr.offset().clone()).unwrap();
}
}
self.copy_on_write(ptr, CopyOnWriteReason::Steal, or.index().clone());
}
Ok(Some(obj))
Expand Down Expand Up @@ -370,24 +383,34 @@ where
return Ok(());
}
}

warn!("Entry would need to be written to persistent cache but procedure unimplemented!");

// TODO: Compress and write-out entry
// let mut pcache = pcache_mtx.write();
// let mut buf = BufWrite::with_capacity(Block(1));
// object.value_mut().get_mut().pack(&mut buf)?;
// let _ = pcache.remove(offset);
// pcache
// .prepare_insert(offset, &vec, None)
// .insert(|maybe_offset, data| {
// // TODO: Write eventually not synced data to disk finally.
// if let Some(offset) = maybe_offset {
// self.pool
// .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?;
// }
// Ok(())
// })?;
// We need to compress data here to ensure compatability
// with the other branch going through the write back
// procedure.
let compression = &self.default_compression;
let compressed_data = {
let mut state = compression.new_compression()?;
{
object.value().read().pack(&mut state)?;
drop(object);
}
state.finish()
};
let away = Arc::clone(pcache_mtx);
// Arc to storage pool
let pool = self.pool.clone();
self.pool.begin_write_offload(offset, move || {
let mut pcache = away.write();
let _ = pcache.remove(offset);
pcache
.prepare_insert(offset, compressed_data, None)
.insert(|maybe_offset, buf| {
if let Some(offset) = maybe_offset {
pool.begin_write(buf, *offset)?;
}
Ok(())
})
.unwrap();
})?;
}
return Ok(());
}
Expand Down Expand Up @@ -483,23 +506,22 @@ where
if !skip_write_back {
self.pool.begin_write(compressed_data, offset)?;
} else {
// Cheap copy due to rc
// Cheap copy
let bar = compressed_data.clone();
#[cfg(feature = "nvm")]
if let Some(ref pcache_mtx) = self.persistent_cache {
let away = Arc::clone(pcache_mtx);
self.pool.begin_foo(offset, move || {
// Arc to storage pool
let pool = self.pool.clone();
self.pool.begin_write_offload(offset, move || {
let mut pcache = away.write();
let _ = pcache.remove(offset);
pcache
.prepare_insert(offset, bar, None)
.insert(|maybe_offset, data| {
// TODO: Deduplicate this?
// if let Some(offset) = maybe_offset {
// self.pool
// .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?;
// }
panic!("This should not have happnened in the debug run!");
.insert(|maybe_offset, buf| {
if let Some(offset) = maybe_offset {
pool.begin_write(buf, *offset)?;
}
Ok(())
})
.unwrap();
Expand Down
Loading

0 comments on commit f883a59

Please sign in to comment.