From f883a59918915e11a4821798af8042bbf925ac56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20W=C3=BCnsche?= Date: Fri, 14 Jul 2023 20:14:13 +0200 Subject: [PATCH] tmp commit 4 --- betree/pmem-hashmap/src/allocator.rs | 103 +++++++++------ betree/pmem-hashmap/src/bin/bench_pal.rs | 4 +- betree/src/buffer.rs | 4 +- betree/src/data_management/dmu.rs | 76 +++++++---- betree/src/replication/lru.rs | 153 ++++++++++++----------- betree/src/replication/lru_worker.rs | 14 +-- betree/src/replication/mod.rs | 98 +++++++-------- betree/src/replication/tree.rs | 44 +++++++ betree/src/storage_pool/mod.rs | 2 +- betree/src/storage_pool/unit.rs | 5 +- 10 files changed, 303 insertions(+), 200 deletions(-) create mode 100644 betree/src/replication/tree.rs diff --git a/betree/pmem-hashmap/src/allocator.rs b/betree/pmem-hashmap/src/allocator.rs index 4ddc1fb6..cdca6ef7 100644 --- a/betree/pmem-hashmap/src/allocator.rs +++ b/betree/pmem-hashmap/src/allocator.rs @@ -1,6 +1,8 @@ use super::*; use errno::errno; use std::alloc::{AllocError, Allocator}; +use std::fmt::Debug; +use std::marker::PhantomData; use std::{ffi::c_void, ptr::NonNull, sync::Arc}; use thiserror::Error; @@ -32,11 +34,11 @@ impl Into for PalError { unsafe impl Allocator for Pal { fn allocate(&self, layout: std::alloc::Layout) -> Result, AllocError> { - let ptr = self.allocate(layout.size()).map_err(|_| AllocError)?; - Ok(NonNull::new(unsafe { - core::slice::from_raw_parts_mut(ptr.load() as *mut u8, layout.size()) - }) - .ok_or_else(|| AllocError)?) + let mut ptr = self.allocate(layout.size()).map_err(|_| AllocError)?; + Ok( + NonNull::new(unsafe { core::slice::from_raw_parts_mut(ptr.load_mut(), layout.size()) }) + .ok_or_else(|| AllocError)?, + ) } unsafe fn deallocate(&self, ptr: NonNull, _layout: std::alloc::Layout) { @@ -64,12 +66,41 @@ pub enum PalError { // A friendly persistent pointer. Useless without the according handle to the // original arena. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct PalPtr { +pub struct PalPtr { inner: PMEMoid, size: usize, + _phantom: PhantomData, +} + +impl PartialEq for PalPtr { + fn eq(&self, other: &Self) -> bool { + self.inner == other.inner && self.size == other.size + } +} + +impl Eq for PalPtr {} + +impl Debug for PalPtr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PalPtr") + .field("inner", &self.inner) + .field("size", &self.size) + .finish() + } +} + +impl Clone for PalPtr { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + size: self.size.clone(), + _phantom: self._phantom.clone(), + } + } } +impl Copy for PalPtr {} + impl PartialEq for PMEMoid { fn eq(&self, other: &Self) -> bool { self.pool_uuid_lo == other.pool_uuid_lo && self.off == other.off @@ -78,16 +109,18 @@ impl PartialEq for PMEMoid { impl Eq for PMEMoid {} -impl Drop for PalPtr { - fn drop(&mut self) { - // self.free() +impl PalPtr { + /// Translate this persistent ptr to a volatile one. + pub fn load(&self) -> &T { + unsafe { (haura_direct(self.inner) as *const T).as_ref().unwrap() } } -} -impl PalPtr { - /// Translate this persistent ptr to a volatile one. - pub fn load(&self) -> *mut c_void { - unsafe { haura_direct(self.inner) } + pub fn load_mut(&mut self) -> &mut T { + unsafe { (haura_direct(self.inner) as *mut T).as_mut().unwrap() } + } + + pub fn init(&mut self, src: *const T, count: usize) { + unsafe { (haura_direct(self.inner) as *mut T).copy_from(src, count) } } /// Copy a range of bytes behind this pointer to a given buffer. Data is @@ -98,7 +131,7 @@ impl PalPtr { pmemobj_memcpy( arena.pool.as_ptr(), other.as_mut_ptr() as *mut c_void, - self.load(), + haura_direct(self.inner), self.size.min(other.len()), PMEMOBJ_F_MEM_NOFLUSH, ) @@ -111,7 +144,7 @@ impl PalPtr { unsafe { pmemobj_memcpy( arena.pool.as_ptr(), - self.load(), + haura_direct(self.inner), other.as_ptr() as *const c_void, self.size.min(other.len()), PMEMOBJ_F_MEM_NONTEMPORAL, @@ -177,7 +210,7 @@ impl Pal { /// Allocate an area of size in the persistent memory. Allocations are /// always guaranteed to be cache line aligned for Optane PMem (64 bytes). - pub fn allocate(&self, size: usize) -> Result { + pub fn allocate(&self, size: usize) -> Result, PalError> { let mut oid = std::mem::MaybeUninit::::uninit(); if unsafe { haura_alloc( @@ -202,6 +235,7 @@ impl Pal { Ok(PalPtr { inner: unsafe { oid.assume_init() }, size, + _phantom: PhantomData {}, }) } @@ -211,12 +245,16 @@ impl Pal { /// /// If called with size 0 an existing root object might be opened, if none /// exists EINVAL is returned. - pub fn root(&self, size: usize) -> Result { + pub fn root(&self, size: usize) -> Result, PalError> { let oid = unsafe { pmemobj_root(self.pool.as_ptr(), size) }; if oid_is_null(oid) { return Err(PalError::AllocationFailed(format!("{}", errno()))); } - Ok(PalPtr { inner: oid, size }) + Ok(PalPtr { + inner: oid, + size, + _phantom: PhantomData {}, + }) } /// Return the maximum size of the current root object. @@ -284,43 +322,30 @@ mod tests { let file = TestFile::new(); { let mut pal = Pal::create(file.path(), 128 * 1024 * 1024, 0o666).unwrap(); - let mut map: BTreeMap = BTreeMap::new_in(pal.clone()); - let root_ptr = pal + let map: BTreeMap = BTreeMap::new_in(pal.clone()); + let mut root_ptr: PalPtr> = pal .root(std::mem::size_of::>()) .unwrap(); - unsafe { - (root_ptr.load() as *mut BTreeMap) - .copy_from(&map, std::mem::size_of::>()) - }; + root_ptr.init(&map, std::mem::size_of::>()); std::mem::forget(map); - let map: &mut BTreeMap = unsafe { - (root_ptr.load() as *mut BTreeMap) - .as_mut() - .unwrap() - }; + let map: &mut BTreeMap = root_ptr.load_mut(); for id in 0..100 { map.insert(id, id); } for id in 100..0 { assert_eq!(map.get(&id), Some(&id)); } - std::mem::forget(map); pal.close(); } { let mut pal = Pal::open(file.path()).unwrap(); - let root_ptr = pal + let mut root_ptr = pal .root(std::mem::size_of::>()) .unwrap(); - let map: &mut BTreeMap = unsafe { - (root_ptr.load() as *mut BTreeMap) - .as_mut() - .unwrap() - }; + let map: &mut BTreeMap = root_ptr.load_mut(); for id in 100..0 { assert_eq!(map.get(&id), Some(&id)); } - std::mem::forget(map); pal.close(); } } diff --git a/betree/pmem-hashmap/src/bin/bench_pal.rs b/betree/pmem-hashmap/src/bin/bench_pal.rs index 144d2531..f7ea3317 100644 --- a/betree/pmem-hashmap/src/bin/bench_pal.rs +++ b/betree/pmem-hashmap/src/bin/bench_pal.rs @@ -12,8 +12,8 @@ fn bench_pal_sub(rank: usize, barrier: Arc, tx: std::sync::m Arc::new(Pal::create(format!("/home/wuensche/pmem/foobar{rank}"), SIZE, 0o666).unwrap()); enum CMD { - Read(Vec), - Write(Vec), + Read(Vec>), + Write(Vec>), Wait, } diff --git a/betree/src/buffer.rs b/betree/src/buffer.rs index 45de3fb3..dece75ae 100644 --- a/betree/src/buffer.rs +++ b/betree/src/buffer.rs @@ -438,12 +438,12 @@ impl Buf { /// be done with "normal" rust allocations. Therefore, only the length of /// the ptr is checked *not* the actual location. These are guaranteed to /// lie at 64 byte cache line boundaries. - pub fn from_persistent_ptr(ptr: PalPtr, size: u32) -> Self { + pub fn from_persistent_ptr(mut ptr: PalPtr, size: u32) -> Self { assert_eq!(size as usize % BLOCK_SIZE, 0); let padded_size = Block::round_up_from_bytes(size); let aligned_buf = AlignedBuf { buf: Arc::new(UnsafeCell::new(AlignedStorage { - ptr: ptr.load() as *mut u8, + ptr: std::ptr::addr_of_mut!(*ptr.load_mut()), capacity: padded_size, is_persistent: true, })), diff --git a/betree/src/data_management/dmu.rs b/betree/src/data_management/dmu.rs index b7176698..5de0282c 100644 --- a/betree/src/data_management/dmu.rs +++ b/betree/src/data_management/dmu.rs @@ -171,6 +171,19 @@ where let obj = CacheValueRef::write(entry); if let ObjRef::Unmodified(ptr, ..) = replace(or, ObjRef::Modified(mid, pk)) { + // Deallocate old-region and remove from cache + if let Some(pcache_mtx) = &self.persistent_cache { + let pcache = pcache_mtx.read(); + let res = pcache.get(ptr.offset().clone()).is_ok(); + drop(pcache); + if res { + // FIXME: Offload this lock to a different thread + // This operation is only tangantely important for the + // operation here and not time critical. + let mut pcache = pcache_mtx.write(); + pcache.remove(ptr.offset().clone()).unwrap(); + } + } self.copy_on_write(ptr, CopyOnWriteReason::Steal, or.index().clone()); } Ok(Some(obj)) @@ -370,24 +383,34 @@ where return Ok(()); } } - - warn!("Entry would need to be written to persistent cache but procedure unimplemented!"); - - // TODO: Compress and write-out entry - // let mut pcache = pcache_mtx.write(); - // let mut buf = BufWrite::with_capacity(Block(1)); - // object.value_mut().get_mut().pack(&mut buf)?; - // let _ = pcache.remove(offset); - // pcache - // .prepare_insert(offset, &vec, None) - // .insert(|maybe_offset, data| { - // // TODO: Write eventually not synced data to disk finally. - // if let Some(offset) = maybe_offset { - // self.pool - // .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?; - // } - // Ok(()) - // })?; + // We need to compress data here to ensure compatability + // with the other branch going through the write back + // procedure. + let compression = &self.default_compression; + let compressed_data = { + let mut state = compression.new_compression()?; + { + object.value().read().pack(&mut state)?; + drop(object); + } + state.finish() + }; + let away = Arc::clone(pcache_mtx); + // Arc to storage pool + let pool = self.pool.clone(); + self.pool.begin_write_offload(offset, move || { + let mut pcache = away.write(); + let _ = pcache.remove(offset); + pcache + .prepare_insert(offset, compressed_data, None) + .insert(|maybe_offset, buf| { + if let Some(offset) = maybe_offset { + pool.begin_write(buf, *offset)?; + } + Ok(()) + }) + .unwrap(); + })?; } return Ok(()); } @@ -483,23 +506,22 @@ where if !skip_write_back { self.pool.begin_write(compressed_data, offset)?; } else { - // Cheap copy due to rc + // Cheap copy let bar = compressed_data.clone(); #[cfg(feature = "nvm")] if let Some(ref pcache_mtx) = self.persistent_cache { let away = Arc::clone(pcache_mtx); - self.pool.begin_foo(offset, move || { + // Arc to storage pool + let pool = self.pool.clone(); + self.pool.begin_write_offload(offset, move || { let mut pcache = away.write(); let _ = pcache.remove(offset); pcache .prepare_insert(offset, bar, None) - .insert(|maybe_offset, data| { - // TODO: Deduplicate this? - // if let Some(offset) = maybe_offset { - // self.pool - // .begin_write(Buf::from_zero_padded(data.to_vec()), *offset)?; - // } - panic!("This should not have happnened in the debug run!"); + .insert(|maybe_offset, buf| { + if let Some(offset) = maybe_offset { + pool.begin_write(buf, *offset)?; + } Ok(()) }) .unwrap(); diff --git a/betree/src/replication/lru.rs b/betree/src/replication/lru.rs index dc019dc0..574ee073 100644 --- a/betree/src/replication/lru.rs +++ b/betree/src/replication/lru.rs @@ -8,21 +8,21 @@ use std::{marker::PhantomData, mem::size_of, ptr::NonNull}; /// ====== /// The internals of this method are highly unsafe. Concurrent usage and /// removal are urgently discouraged. -fn fetch(ptr: &PalPtr) -> Result<&mut PlruNode, PMapError> { - Ok(unsafe { - std::mem::transmute::<&mut [u8; PLRU_NODE_SIZE], _>( - core::slice::from_raw_parts_mut(ptr.load() as *mut u8, PLRU_NODE_SIZE) - .try_into() - .unwrap(), - ) - }) -} +// fn fetch(ptr: &PalPtr>) -> Result<&mut PlruNode, PMapError> { +// Ok(unsafe { +// std::mem::transmute::<&mut [u8; PLRU_NODE_SIZE], _>( +// core::slice::from_raw_parts_mut(ptr.load_mut(), PLRU_NODE_SIZE) +// .try_into() +// .unwrap(), +// ) +// }) +// } /// Persistent LRU #[repr(C)] pub struct Plru { - head: Option, - tail: Option, + head: Option>>, + tail: Option>>, // in Blocks? Return evicted element when full capacity: u64, count: u64, @@ -47,7 +47,7 @@ impl Plru { } } - pub fn touch(&mut self, node_ptr: &PalPtr) -> Result<(), PMapError> { + pub fn touch(&mut self, node_ptr: &mut PalPtr>) -> Result<(), PMapError> { if self.head.as_ref() == Some(node_ptr) { return Ok(()); } @@ -55,10 +55,10 @@ impl Plru { self.cut_node_and_stitch(node_ptr)?; // Fixate new head - let old_head_ptr = self.head.as_ref().expect("Invalid State"); - let old_head: &mut PlruNode = fetch(old_head_ptr).unwrap(); + let mut old_head_ptr = self.head.as_mut().expect("Invalid State"); + let old_head: &mut PlruNode = old_head_ptr.load_mut(); old_head.fwd = Some(node_ptr.clone()); - let node: &mut PlruNode = fetch(node_ptr).unwrap(); + let node: &mut PlruNode = node_ptr.load_mut(); node.back = self.head.clone(); self.head = Some(node_ptr.clone()); @@ -68,12 +68,12 @@ impl Plru { /// Add a new entry into the LRU. Will fail if already present. pub fn insert( &mut self, - node_ptr: PalPtr, + mut node_ptr: PalPtr>, hash: u64, size: u64, baggage: T, ) -> Result<(), PMapError> { - let new_node = fetch(&node_ptr).unwrap(); + let new_node = node_ptr.load_mut(); new_node.fwd = None; new_node.back = self.head.clone(); new_node.size = size; @@ -81,7 +81,7 @@ impl Plru { new_node.hash = hash; if let Some(ref mut head_ptr) = self.head.as_mut() { - let head: &mut PlruNode = fetch(head_ptr).unwrap(); + let head: &mut PlruNode = head_ptr.load_mut(); head.fwd = Some(node_ptr.clone()); self.head = Some(node_ptr); } else { @@ -98,28 +98,33 @@ impl Plru { /// This call does not perform the removal itself. pub fn evict(&self, size: u64) -> Result, PMapError> { if let (Some(ref tail), true) = (self.tail.as_ref(), self.size + size > self.capacity) { - let node = fetch(tail).unwrap(); - return Ok(Some((node.hash, node.key))); + let node = tail.load(); + return Ok(Some((node.hash, &node.key))); } Ok(None) } - fn cut_node_and_stitch(&mut self, node_ptr: &PalPtr) -> Result<(), PMapError> { - let node: &mut PlruNode = fetch(node_ptr).unwrap(); + fn cut_node_and_stitch(&mut self, node_ptr: &mut PalPtr>) -> Result<(), PMapError> { + let node: &mut PlruNode = node_ptr.load_mut(); if let Some(ref mut forward_ptr) = node.fwd.as_mut() { - let forward: &mut PlruNode = fetch(forward_ptr).unwrap(); + let forward: &mut PlruNode = forward_ptr.load_mut(); forward.back = node.back.clone(); } - if self.tail.as_ref() == Some(node_ptr) { - self.tail = node.fwd.clone(); - } if let Some(ref mut back_ptr) = node.back.as_mut() { - let back: &mut PlruNode = fetch(back_ptr).unwrap(); + let back: &mut PlruNode = back_ptr.load_mut(); back.fwd = node.fwd.clone(); } + drop(node); + let node = node_ptr.load(); + if self.head.as_ref() == Some(node_ptr) { self.head = node.back.clone(); } + if self.tail.as_ref() == Some(node_ptr) { + self.tail = node.fwd.clone(); + } + + let node: &mut PlruNode = node_ptr.load_mut(); node.fwd = None; node.back = None; @@ -127,9 +132,9 @@ impl Plru { } /// Remove a node from cache and deallocate. - pub fn remove(&mut self, node_ptr: &PalPtr) -> Result<(), PMapError> { + pub fn remove(&mut self, node_ptr: &mut PalPtr>) -> Result<(), PMapError> { self.cut_node_and_stitch(node_ptr)?; - let node: &mut PlruNode = fetch(node_ptr).unwrap(); + let node: &PlruNode = node_ptr.load(); self.size -= node.size; self.count -= 1; Ok(()) @@ -159,8 +164,8 @@ impl Plru { /// with extrem caution, and be sure what you are doing. #[repr(C)] pub struct PlruNode { - fwd: Option, - back: Option, + fwd: Option>>, + back: Option>>, size: u64, hash: u64, key: T, @@ -173,7 +178,13 @@ impl PlruNode { "Size of attached data to LRU entry surpasses size constraint." ); - pub fn new(fwd: Option, back: Option, size: u64, hash: u64, key: T) -> Self { + pub fn new( + fwd: Option>>, + back: Option>>, + size: u64, + hash: u64, + key: T, + ) -> Self { // has to remain to ensure that the code path is evaluated by rustc let _ = Self::SIZE_CONSTRAINT; Self { @@ -228,28 +239,31 @@ mod tests { fn new() { let file = TestFile::new(); let pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); - let root = pal.root(size_of::>()).unwrap(); - let plru = root.load() as *mut Plru<()>; - unsafe { plru.write_unaligned(Plru::<()>::init(32 * 1024 * 1024)) }; + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); } #[test] fn insert() { let file = TestFile::new(); let pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); - let root = pal.root(size_of::>()).unwrap(); - let plru = root.load() as *mut Plru<()>; - unsafe { plru.write_unaligned(Plru::<()>::init(32 * 1024 * 1024)) }; + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); // Insert 3 entries for id in 0..3 { let node_ptr = pal.allocate(PLRU_NODE_SIZE).unwrap(); - let node = unsafe { node_ptr.load() as *mut PlruNode<()> }; - let plru = unsafe { plru.as_mut().unwrap() }; plru.insert(node_ptr.clone(), id, 312, ()).unwrap(); assert_eq!(plru.head, Some(node_ptr)); } - let plru = unsafe { plru.as_mut().unwrap() }; assert_eq!(plru.count, 3); } @@ -257,24 +271,24 @@ mod tests { fn touch() { let file = TestFile::new(); let pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); - let root = pal.root(size_of::>()).unwrap(); - let plru = root.load() as *mut Plru<()>; - unsafe { plru.write_unaligned(Plru::<()>::init(32 * 1024 * 1024)) }; + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); // Insert 3 entries let mut ptr = vec![]; for id in 0..3 { let node_ptr = pal.allocate(PLRU_NODE_SIZE).unwrap(); ptr.push(node_ptr.clone()); - let node = unsafe { node_ptr.load() as *mut PlruNode<()> }; - let plru = unsafe { plru.as_mut().unwrap() }; plru.insert(node_ptr.clone(), id, 312, ()).unwrap(); assert_eq!(plru.head, Some(node_ptr)); } - let plru = unsafe { plru.as_mut().unwrap() }; assert_eq!(plru.count, 3); - for ptr in ptr.iter() { + for ptr in ptr.iter_mut() { plru.touch(ptr); assert_eq!(plru.head, Some(ptr).cloned()); } @@ -284,22 +298,22 @@ mod tests { fn evict() { let file = TestFile::new(); let pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); - let root = pal.root(size_of::>()).unwrap(); - let plru = root.load() as *mut Plru<()>; - unsafe { plru.write_unaligned(Plru::<()>::init(32 * 1024 * 1024)) }; + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); // Insert 3 entries let mut ptr = vec![]; for id in 0..3 { let node_ptr = pal.allocate(PLRU_NODE_SIZE).unwrap(); ptr.push(node_ptr.clone()); - let node = unsafe { node_ptr.load() as *mut PlruNode<()> }; - let plru = unsafe { plru.as_mut().unwrap() }; plru.insert(node_ptr.clone(), id, 15 * 1024 * 1024, ()) .unwrap(); assert_eq!(plru.head, Some(node_ptr)); } - let plru = unsafe { plru.as_mut().unwrap() }; assert_eq!(plru.count, 3); assert_eq!(plru.evict(0).unwrap(), Some((0, &()))); @@ -312,22 +326,22 @@ mod tests { fn remove() { let file = TestFile::new(); let pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); - let root = pal.root(size_of::>()).unwrap(); - let plru = root.load() as *mut Plru<()>; - unsafe { plru.write_unaligned(Plru::<()>::init(32 * 1024 * 1024)) }; + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); // Insert 3 entries let mut ptr = vec![]; for id in 0..3 { let node_ptr = pal.allocate(PLRU_NODE_SIZE).unwrap(); ptr.push(node_ptr.clone()); - let node = unsafe { node_ptr.load() as *mut PlruNode<()> }; - let plru = unsafe { plru.as_mut().unwrap() }; plru.insert(node_ptr.clone(), id, 15 * 1024 * 1024, ()) .unwrap(); assert_eq!(plru.head, Some(node_ptr)); } - let plru = unsafe { plru.as_mut().unwrap() }; assert_eq!(plru.count, 3); for ptr in ptr.iter_mut() { @@ -343,33 +357,32 @@ mod tests { let mut ptr = vec![]; { let mut pal = Pal::create(file.path(), 32 * 1024 * 1024, 0o666).unwrap(); - let root = pal.root(size_of::>()).unwrap(); - let plru = root.load() as *mut Plru<()>; - unsafe { plru.write_unaligned(Plru::<()>::init(32 * 1024 * 1024)) }; + let mut root = pal.root(size_of::>()).unwrap(); + root.init( + &Plru::<()>::init(32 * 1024 * 1024), + std::mem::size_of::>(), + ); + let plru = root.load_mut(); // Insert 3 entries for id in 0..3 { let node_ptr = pal.allocate(PLRU_NODE_SIZE).unwrap(); ptr.push(node_ptr.clone()); - let node = unsafe { node_ptr.load() as *mut PlruNode<()> }; - let plru = unsafe { plru.as_mut().unwrap() }; plru.insert(node_ptr.clone(), id, 15 * 1024 * 1024, ()) .unwrap(); assert_eq!(plru.head, Some(node_ptr)); } - let plru = unsafe { plru.as_mut().unwrap() }; assert_eq!(plru.count, 3); pal.close(); } { let mut pal = Pal::open(file.path()).unwrap(); - let root = pal.root(size_of::>()).unwrap(); - let plru = root.load() as *mut Plru<()>; - let plru = unsafe { plru.as_mut().unwrap() }; + let mut root: PalPtr> = pal.root(size_of::>()).unwrap(); + let plru = root.load_mut(); assert_eq!(plru.head, Some(ptr.last().unwrap().clone())); assert_eq!(plru.tail, Some(ptr.first().unwrap().clone())); - for ptr in ptr.iter().rev() { + for ptr in ptr.iter_mut().rev() { assert_eq!(plru.head, Some(ptr.clone())); plru.remove(ptr); } diff --git a/betree/src/replication/lru_worker.rs b/betree/src/replication/lru_worker.rs index 6152a94e..47a66fca 100644 --- a/betree/src/replication/lru_worker.rs +++ b/betree/src/replication/lru_worker.rs @@ -1,12 +1,12 @@ use crossbeam_channel::Receiver; use pmem_hashmap::allocator::PalPtr; -use super::{PCacheRoot, Persistent}; +use super::{lru::PlruNode, PCacheRoot, Persistent}; pub enum Msg { - Touch(PalPtr), - Remove(PalPtr), - Insert(PalPtr, u64, u64, T), + Touch(PalPtr>), + Remove(PalPtr>), + Insert(PalPtr>, u64, u64, T), Close, } @@ -14,13 +14,13 @@ pub fn main(rx: Receiver>, mut root: Persistent>) { // TODO: Error handling with return to valid state in the data section.. while let Ok(msg) = rx.recv() { match msg { - Msg::Touch(ptr) => { + Msg::Touch(mut ptr) => { let mut lru = root.lru.write(); - let _ = lru.touch(&ptr); + let _ = lru.touch(&mut ptr); } Msg::Remove(mut ptr) => { let mut lru = root.lru.write(); - let _ = lru.remove(&ptr); + let _ = lru.remove(&mut ptr); ptr.free(); } Msg::Insert(ptr, hash, size, baggage) => { diff --git a/betree/src/replication/mod.rs b/betree/src/replication/mod.rs index 50bbcd4b..a27c93c4 100644 --- a/betree/src/replication/mod.rs +++ b/betree/src/replication/mod.rs @@ -42,25 +42,28 @@ use zstd_safe::WriteBuf; mod lru; mod lru_worker; +// mod tree; use lru::Plru; use serde::{Deserialize, Serialize}; use crate::buffer::Buf; +use self::lru::PlruNode; + /// A pointer to a region in persistent memory. -pub struct Persistent(NonNull); +pub struct Persistent(PalPtr); // Pointer to persistent memory can be assumed to be non-thread-local unsafe impl Send for Persistent {} impl Deref for Persistent { type Target = T; fn deref(&self) -> &Self::Target { - unsafe { self.0.as_ref() } + unsafe { self.0.load() } } } impl DerefMut for Persistent { fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { self.0.as_mut() } + unsafe { self.0.load_mut() } } } @@ -99,15 +102,15 @@ impl Drop for PersistentCache { } pub struct PCacheRoot { - map: BTreeMap, + map: BTreeMap, Pal>, lru: RwLock>, } -#[derive(Debug, Clone)] -pub struct PCacheMapEntry { +#[derive(Debug)] +pub struct PCacheMapEntry { size: usize, - lru_node: PalPtr, - data: PalPtr, + lru_node: PalPtr>, + data: PalPtr, } /// Configuration for a persistent cache. @@ -133,7 +136,7 @@ impl<'a, K, T: Clone> PersistentCacheInsertion<'a, K, T> { /// initiated anew. pub fn insert(self, f: F) -> Result<(), PMapError> where - F: Fn(&T, &[u8]) -> Result<(), crate::vdev::Error>, + F: Fn(&T, Buf) -> Result<(), crate::vdev::Error>, { loop { let key = { @@ -146,7 +149,9 @@ impl<'a, K, T: Clone> PersistentCacheInsertion<'a, K, T> { let data = unsafe { core::slice::from_raw_parts(entry.data.load() as *const u8, entry.size) }; - if f(baggage, data).is_err() { + + let buf = Buf::from_persistent_ptr(entry.data, entry.size as u32); + if f(baggage, buf).is_err() { return Err(PMapError::ExternalError("Writeback failed".into())); } key @@ -210,50 +215,41 @@ impl PersistentCache { pub fn open>(path: P) -> Result { let pal = Pal::open(path.into()).unwrap(); let root = pal.root(size_of::>()).unwrap(); - assert!(!root.load().is_null()); - if let Some(root) = NonNull::new(root.load() as *mut PCacheRoot) { - let (tx, rx) = crossbeam_channel::unbounded(); - let root_lru = Persistent(root.clone()); - let hndl = std::thread::spawn(move || lru_worker::main(rx, root_lru)); - let root = Persistent(root); - Ok(Self { - pal, - tx, - root, - hndl: Some(hndl), - key_type: PhantomData::default(), - }) - } else { - Err(PMapError::DoesNotExist) - } + let (tx, rx) = crossbeam_channel::unbounded(); + let root_lru = Persistent(root.clone()); + let hndl = std::thread::spawn(move || lru_worker::main(rx, root_lru)); + let root = Persistent(root); + Ok(Self { + pal, + tx, + root, + hndl: Some(hndl), + key_type: PhantomData::default(), + }) } /// Create a new [PersistentCache] in the specified path. Fails if underlying resources are not valid. pub fn create>(path: P, size: usize) -> Result { let mut pal = Pal::create(path.into(), size, 0o666).unwrap(); - let root = pal.root(size_of::>()).unwrap(); - assert!(!root.load().is_null()); - if let Some(root) = NonNull::new(root.load() as *mut PCacheRoot) { - unsafe { - root.as_ptr().write_unaligned(PCacheRoot { - lru: RwLock::new(Plru::init(size as u64)), - map: BTreeMap::new_in(pal.clone()), - }) - }; - let (tx, rx) = crossbeam_channel::unbounded(); - let root_lru = Persistent(root.clone()); - let hndl = std::thread::spawn(move || lru_worker::main(rx, root_lru)); - let mut root = Persistent(root); - Ok(Self { - pal, - tx, - root, - hndl: Some(hndl), - key_type: PhantomData::default(), - }) - } else { - Err(PMapError::DoesNotExist) - } + let mut root: PalPtr> = pal.root(size_of::>()).unwrap(); + root.init( + &PCacheRoot { + lru: RwLock::new(Plru::init(size as u64)), + map: BTreeMap::new_in(pal.clone()), + }, + std::mem::size_of::>(), + ); + let (tx, rx) = crossbeam_channel::unbounded(); + let root_lru = Persistent(root.clone()); + let hndl = std::thread::spawn(move || lru_worker::main(rx, root_lru)); + let mut root = Persistent(root); + Ok(Self { + pal, + tx, + root, + hndl: Some(hndl), + key_type: PhantomData::default(), + }) } /// Fetch an entry from the hashmap. @@ -261,7 +257,7 @@ impl PersistentCache { let mut hasher = XxHash64::default(); key.hash(&mut hasher); let hash = hasher.finish(); - let res = self.root.map.get(&hash).cloned(); + let res = self.root.map.get(&hash); if let Some(entry) = res { self.tx.send(lru_worker::Msg::Touch(entry.lru_node)); // self.root.lru.touch(&entry.lru_node)?; @@ -279,7 +275,7 @@ impl PersistentCache { let mut hasher = XxHash64::default(); key.hash(&mut hasher); let hash = hasher.finish(); - let res = self.root.map.get(&hash).cloned(); + let res = self.root.map.get(&hash); if let Some(entry) = res { self.tx.send(lru_worker::Msg::Touch(entry.lru_node)); // self.root.lru.touch(&entry.lru_node)?; diff --git a/betree/src/replication/tree.rs b/betree/src/replication/tree.rs new file mode 100644 index 00000000..a7b2fd91 --- /dev/null +++ b/betree/src/replication/tree.rs @@ -0,0 +1,44 @@ +use pmem_hashmap::allocator::PalPtr; + +/// A basic BTree implementation using PalPtr. +/// +/// + +// Order of a BTree +const M: usize = 5; + +struct Node { + values: [Option<(K, V)>; M], + // Fine granular locking, could be a way to do some more efficient inserts *while* reading from the tree. + child: [RwLock; M - 1], +} + +enum Child { + Leaf, + Node(RwLock), +} + +impl Node { + pub fn insert(&mut self, key: K, value: V) -> Option { + todo!() + } + + pub fn get(&self, key: K) -> Option { + for pos in 0..M { + if let Some(pair) = self.values[pos] { + if pair.0 == key { + return Some(pair.1) + } + if pair.0 > == key { + self.child[pos] + } + } else { + break; + } + } + self.values.iter().find(|item| { + item.is_some() && item.unwrap().0 < + }).map(|idx| se) + todo!() + } +} diff --git a/betree/src/storage_pool/mod.rs b/betree/src/storage_pool/mod.rs index 581a4b94..c8c39d0f 100644 --- a/betree/src/storage_pool/mod.rs +++ b/betree/src/storage_pool/mod.rs @@ -58,7 +58,7 @@ pub trait StoragePoolLayer: Clone + Send + Sync + 'static { /// Issues a write request that might happen in the background. fn begin_write(&self, data: Buf, offset: DiskOffset) -> VdevResult<()>; - fn begin_foo(&self, offset: DiskOffset, f: F) -> VdevResult<()> + fn begin_write_offload(&self, offset: DiskOffset, f: F) -> VdevResult<()> where F: FnOnce() + Send + 'static; diff --git a/betree/src/storage_pool/unit.rs b/betree/src/storage_pool/unit.rs index 99272f24..4b53f902 100644 --- a/betree/src/storage_pool/unit.rs +++ b/betree/src/storage_pool/unit.rs @@ -181,14 +181,17 @@ impl StoragePoolLayer for StoragePoolUnit { ret } - fn begin_foo(&self, offset: DiskOffset, f: F) -> vdev::Result<()> + fn begin_write_offload(&self, offset: DiskOffset, f: F) -> vdev::Result<()> where F: FnOnce() + Send + 'static, { + let inner = self.inner.clone(); let (enqueue_done, wait_for_enqueue) = futures::channel::oneshot::channel(); let write = self.inner.pool.spawn_with_handle(async move { wait_for_enqueue.await.unwrap(); f(); + + inner.write_back_queue.mark_completed(&offset).await; Ok(()) })?;