Skip to content

Commit

Permalink
Clean up pmdk (#45)
Browse files Browse the repository at this point in the history
* pmdk: clean up

This commit removes some permission errors which existed in the creation of new
pools and reworks the tests and attached binary. This is squashed from a
discarded branch.

* pmdk: clippy lints

* vdev: fix usage of PMem

* fix: clippy lints
  • Loading branch information
jwuensche authored Jan 15, 2024
1 parent be50513 commit df1ed2f
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 234 deletions.
9 changes: 7 additions & 2 deletions betree/pmdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ license = "MIT OR Apache-2.0"
authors = ["Sajad Karim <sajad.karim@ovgu.de>", "Johannes Wünsche <johannes@spacesnek.rocks>"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html


[build-dependencies]
bindgen = "0.58.1"
bindgen = "0.65"

[dependencies]
core_affinity = "0.8.0"

[dev-dependencies]
tempfile = "3.6.0"
100 changes: 100 additions & 0 deletions betree/pmdk/src/bin/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::sync::Arc;

use pmdk::PMem;

const BUFFER_SIZE: usize = 4 * 1024;
const SIZE: usize = 64 * 1024 * 1024 * 1024;
const ITER: usize = SIZE / BUFFER_SIZE;
const JOBS: usize = 8;
const OPS_PER_JOB: usize = ITER / JOBS;
const REM_OPS: usize = ITER % JOBS;
enum Command {
Read,
Write,
Wait,
}

#[allow(clippy::absurd_extreme_comparisons)]
fn basic_read_write_test(path: &str) -> Result<(), std::io::Error> {
let pmem = Arc::new(match PMem::create(path, SIZE) {
Ok(value) => value,
Err(_) => PMem::open(path)?,
});

let threads: Vec<_> = (0..JOBS)
.map(|id| {
let p = Arc::clone(&pmem);
let (tx, rx) = std::sync::mpsc::sync_channel::<Command>(0);
(
tx,
std::thread::spawn(move || {
assert!(core_affinity::set_for_current(core_affinity::CoreId { id }));
let mut buf = vec![0u8; BUFFER_SIZE];
while let Ok(msg) = rx.recv() {
match msg {
Command::Read => {
for it in 0..OPS_PER_JOB {
p.read((it * BUFFER_SIZE) + (id * BUFFER_SIZE), &mut buf)
}
if id < REM_OPS {
p.read(
JOBS * OPS_PER_JOB * BUFFER_SIZE + (id * BUFFER_SIZE),
&mut buf,
)
}
}
Command::Write => unsafe {
for it in 0..OPS_PER_JOB {
p.write((it * BUFFER_SIZE) + (id * BUFFER_SIZE), &buf)
}
if id < REM_OPS {
p.write(
JOBS * OPS_PER_JOB * BUFFER_SIZE + (id * BUFFER_SIZE),
&buf,
)
}
},
Command::Wait => {}
}
}
}),
)
})
.collect();

// Write
let start = std::time::Instant::now();
for job in threads.iter() {
job.0.send(Command::Write).unwrap();
}

for job in threads.iter() {
job.0.send(Command::Wait).unwrap();
}

println!(
"Write: Achieved {} GiB/s",
SIZE as f32 / 1024f32 / 1024f32 / 1024f32 / start.elapsed().as_secs_f32()
);

// Read
let start = std::time::Instant::now();
for id in 0..JOBS {
threads[id % JOBS].0.send(Command::Read).unwrap();
}
for id in 0..JOBS {
threads[id % JOBS].0.send(Command::Wait).unwrap();
}

println!(
"Read: Achieved {} GiB/s",
SIZE as f32 / 1024f32 / 1024f32 / 1024f32 / start.elapsed().as_secs_f32()
);

Ok(())
}

fn main() -> Result<(), std::io::Error> {
basic_read_write_test("PATH_TO_YOUR_PMEM")?;
Ok(())
}
238 changes: 174 additions & 64 deletions betree/pmdk/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,98 +1,208 @@
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
// The u128 types of rust and c are due to some bugs in llvm incompatible, which
// is the case for some values in the used libraries. But we don't actively use
// them now, and they spam the build log, so we deactivate the warning for now.
#![allow(improper_ctypes)]

include!(concat!(env!("OUT_DIR"), "/bindings.rs"));

use std::os::raw::c_void;
use std::{
ffi::{c_void, CString},
mem::forget,
path::PathBuf,
ptr::NonNull,
};

#[derive(Debug)]
pub struct PMem {
pub ptr: *mut c_void
ptr: NonNull<c_void>,
actually_pmem: bool,
len: usize,
}

impl Drop for PMem {
fn drop(&mut self) {
self.close()
}
}

unsafe impl Send for PMem {}
unsafe impl Sync for PMem {}

#[allow(clippy::len_without_is_empty)]
impl PMem {
pub fn create(filepath : &str, len: u64, mapped_len : &mut u64, is_pmem : &mut i32) -> Result<Self, std::io::Error> {
let mut ptr = unsafe {
pmem_map_file(filepath.as_ptr() as *const i8,
len,
(PMEM_FILE_CREATE|PMEM_FILE_EXCL) as i32,
0666,
mapped_len,
is_pmem)
/// Create a new persistent memory pool. By default a file is created which
/// is readable and writable by all users.
pub fn create<P: Into<PathBuf>>(filepath: P, len: usize) -> Result<Self, std::io::Error> {
let mut mapped_len = 0;
let mut is_pmem = 0;
let ptr = unsafe {
pmem_map_file(
CString::new(filepath.into().to_string_lossy().into_owned())?.into_raw(),
len,
(PMEM_FILE_CREATE | PMEM_FILE_EXCL) as i32,
0o666,
&mut mapped_len,
&mut is_pmem,
)
};

if ptr.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::Other,
format!("Failed to create memory pool. filepath: {}", filepath)));
}

Ok(PMem {
ptr : ptr
})
Self::new(ptr, mapped_len, is_pmem)
}

pub fn open(filepath: &str, mapped_len: &mut u64, is_pmem: &mut i32) -> Result<Self, std::io::Error> {
let mut ptr = unsafe {
pmem_map_file(filepath.as_ptr() as *const i8,
0, // Opening an existing file requires no flag(s).
0, // No length as no flag is provided.
0666,
mapped_len,
is_pmem)
/// Open an existing persistent memory pool.
pub fn open<P: Into<PathBuf>>(filepath: P) -> Result<Self, std::io::Error> {
let mut mapped_len = 0;
let mut is_pmem = 0;
let ptr = unsafe {
pmem_map_file(
CString::new(filepath.into().to_string_lossy().into_owned())?.into_raw(),
0, // Opening an existing file requires no flag(s).
0, // No length as no flag is provided.
0o666,
&mut mapped_len,
&mut is_pmem,
)
};
Self::new(ptr, mapped_len, is_pmem)
}

if ptr.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::Other,
format!("Failed to open the memory pool. filepath: {}", filepath)));
}
fn new(ptr: *mut c_void, len: usize, is_pmem: i32) -> Result<Self, std::io::Error> {
NonNull::new(ptr)
.map(|valid| PMem {
ptr: valid,
actually_pmem: is_pmem != 0,
len,
})
.ok_or_else(|| {
let err = unsafe { CString::from_raw(pmem_errormsg() as *mut i8) };
let err_msg = format!(
"Failed to create memory pool. filepath: {}",
err.to_string_lossy()
);
forget(err);
std::io::Error::new(std::io::ErrorKind::Other, err_msg)
})
}

Ok(PMem {
ptr: ptr
})
/// Read a range of bytes from the specified offset.
pub fn read(&self, offset: usize, data: &mut [u8]) {
let _ = unsafe {
pmem_memcpy(
data.as_ptr() as *mut c_void,
self.ptr.as_ptr().add(offset),
data.len(),
PMEM_F_MEM_NOFLUSH,
)
};
}

pub fn read(&self, offset: usize, data: &mut [u8], len: u64) -> Result<(), std::io::Error>{
if self.ptr.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::Other,
format!("File handle is missing for the PMEM file.")));
}
/// Write a range of bytes to the specified offset.
///
/// By default, we always perform a persisting write here, equivalent to a
/// direct & sync in traditional interfaces.
///
/// # Safety
/// It is possible to issue multiple write requests to the same area at the
/// same time. What happens then is undefined and might lead to
/// inconsistencies.
pub unsafe fn write(&self, offset: usize, data: &[u8]) {
let _ = pmem_memcpy(
self.ptr.as_ptr().add(offset),
data.as_ptr() as *mut c_void,
data.len(),
PMEM_F_MEM_NONTEMPORAL,
);
}

let ptr = unsafe {
pmem_memcpy(data.as_ptr() as *mut c_void, self.ptr.add(offset), len, PMEM_F_MEM_NOFLUSH /*| PMEM_F_MEM_TEMPORAL*/)
};

if ptr.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::Other,
format!("Failed to read data from PMEM file. Offset: {}, Size: {}", offset, len)));
};
/// Returns whether or not the underlying storage is either fsdax or devdax.
pub fn is_pmem(&self) -> bool {
self.actually_pmem
}

Ok(())
/// The total length of the memory pool in bytes.
pub fn len(&self) -> usize {
self.len
}

pub unsafe fn write(&self, offset: usize, data: &[u8], len: usize) -> Result<(), std::io::Error>{
if self.ptr.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::Other,
format!("File handle is missing for the PMEM file.")));
fn close(&mut self) {
unsafe {
// TODO: Read out error correctly. Atleast let the output know that something went wrong.
pmem_unmap(self.ptr.as_ptr(), self.len);
}
}
}

let ptr = pmem_memcpy_persist( self.ptr.add(offset), data.as_ptr() as *mut c_void, len as u64);

if self.ptr.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::Other,
format!("Failed to write data to PMEM file. Offset: {}, Size: {}", offset, len)))
};
#[cfg(test)]
mod tests {
use super::*;
use std::{path::PathBuf, process::Command};
use tempfile::Builder;

struct TestFile(PathBuf);

impl TestFile {
pub fn new() -> Self {
TestFile(
Builder::new()
.tempfile()
.expect("Could not get tmpfile")
.path()
.to_path_buf(),
)
}

Ok(())
pub fn path(&self) -> &PathBuf {
&self.0
}
}
impl Drop for TestFile {
fn drop(&mut self) {
if !Command::new("rm")
.arg(self.0.to_str().expect("Could not pass tmpfile"))
.output()
.expect("Could not delete")
.status
.success()
{
eprintln!("Could not delete tmpfile");
}
}
}

pub fn close(&self, mapped_len: &u64) {
#[test]
fn basic_io_session() {
let file = TestFile::new();
let pmem = PMem::create(file.path(), 8 * 1024 * 1024).unwrap();
let buf = vec![42u8; 4 * 1024 * 1024];
unsafe {
pmem_unmap(self.ptr, *mapped_len);
pmem.write(0, &buf);
}
let mut rbuf = vec![0u8; buf.len()];
pmem.read(0, &mut rbuf);
assert_eq!(rbuf, buf);
}
}

#[test]
fn basic_io_persist() {
let file = TestFile::new();
let buf = vec![42u8; 4 * 1024 * 1024];
let offseted = [43u8];
{
let pmem = PMem::create(file.path(), 8 * 1024 * 1024).unwrap();
unsafe {
pmem.write(0, &buf);
pmem.write(buf.len(), &offseted);
}
}
{
let pmem = PMem::open(file.path()).unwrap();
let mut rbuf = vec![42u8; buf.len()];
pmem.read(0, &mut rbuf);
let mut single = [0u8];
pmem.read(buf.len(), &mut single);
assert_eq!(rbuf, buf);
assert_eq!(single, offseted);
}
}
}
Loading

0 comments on commit df1ed2f

Please sign in to comment.