diff --git a/betree/pmdk/Cargo.toml b/betree/pmdk/Cargo.toml index 3830685b..1f3d4dbe 100644 --- a/betree/pmdk/Cargo.toml +++ b/betree/pmdk/Cargo.toml @@ -7,6 +7,11 @@ license = "MIT OR Apache-2.0" authors = ["Sajad Karim ", "Johannes Wünsche "] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [build-dependencies] -bindgen = "0.58.1" +bindgen = "0.65" + +[dependencies] +core_affinity = "0.8.0" + +[dev-dependencies] +tempfile = "3.6.0" diff --git a/betree/pmdk/src/bin/bench.rs b/betree/pmdk/src/bin/bench.rs new file mode 100644 index 00000000..2d3ace99 --- /dev/null +++ b/betree/pmdk/src/bin/bench.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use pmdk::PMem; + +const BUFFER_SIZE: usize = 4 * 1024; +const SIZE: usize = 64 * 1024 * 1024 * 1024; +const ITER: usize = SIZE / BUFFER_SIZE; +const JOBS: usize = 8; +const OPS_PER_JOB: usize = ITER / JOBS; +const REM_OPS: usize = ITER % JOBS; +enum Command { + Read, + Write, + Wait, +} + +#[allow(clippy::absurd_extreme_comparisons)] +fn basic_read_write_test(path: &str) -> Result<(), std::io::Error> { + let pmem = Arc::new(match PMem::create(path, SIZE) { + Ok(value) => value, + Err(_) => PMem::open(path)?, + }); + + let threads: Vec<_> = (0..JOBS) + .map(|id| { + let p = Arc::clone(&pmem); + let (tx, rx) = std::sync::mpsc::sync_channel::(0); + ( + tx, + std::thread::spawn(move || { + assert!(core_affinity::set_for_current(core_affinity::CoreId { id })); + let mut buf = vec![0u8; BUFFER_SIZE]; + while let Ok(msg) = rx.recv() { + match msg { + Command::Read => { + for it in 0..OPS_PER_JOB { + p.read((it * BUFFER_SIZE) + (id * BUFFER_SIZE), &mut buf) + } + if id < REM_OPS { + p.read( + JOBS * OPS_PER_JOB * BUFFER_SIZE + (id * BUFFER_SIZE), + &mut buf, + ) + } + } + Command::Write => unsafe { + for it in 0..OPS_PER_JOB { + p.write((it * BUFFER_SIZE) + (id * BUFFER_SIZE), &buf) + } + if id < REM_OPS { + p.write( + JOBS * OPS_PER_JOB * BUFFER_SIZE + (id * BUFFER_SIZE), + &buf, + ) + } + }, + Command::Wait => {} + } + } + }), + ) + }) + .collect(); + + // Write + let start = std::time::Instant::now(); + for job in threads.iter() { + job.0.send(Command::Write).unwrap(); + } + + for job in threads.iter() { + job.0.send(Command::Wait).unwrap(); + } + + println!( + "Write: Achieved {} GiB/s", + SIZE as f32 / 1024f32 / 1024f32 / 1024f32 / start.elapsed().as_secs_f32() + ); + + // Read + let start = std::time::Instant::now(); + for id in 0..JOBS { + threads[id % JOBS].0.send(Command::Read).unwrap(); + } + for id in 0..JOBS { + threads[id % JOBS].0.send(Command::Wait).unwrap(); + } + + println!( + "Read: Achieved {} GiB/s", + SIZE as f32 / 1024f32 / 1024f32 / 1024f32 / start.elapsed().as_secs_f32() + ); + + Ok(()) +} + +fn main() -> Result<(), std::io::Error> { + basic_read_write_test("PATH_TO_YOUR_PMEM")?; + Ok(()) +} diff --git a/betree/pmdk/src/lib.rs b/betree/pmdk/src/lib.rs index 3edf2197..5d4d8421 100644 --- a/betree/pmdk/src/lib.rs +++ b/betree/pmdk/src/lib.rs @@ -1,98 +1,208 @@ #![allow(non_upper_case_globals)] #![allow(non_camel_case_types)] #![allow(non_snake_case)] +// The u128 types of rust and c are due to some bugs in llvm incompatible, which +// is the case for some values in the used libraries. But we don't actively use +// them now, and they spam the build log, so we deactivate the warning for now. +#![allow(improper_ctypes)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); - -use std::os::raw::c_void; +use std::{ + ffi::{c_void, CString}, + mem::forget, + path::PathBuf, + ptr::NonNull, +}; #[derive(Debug)] pub struct PMem { - pub ptr: *mut c_void + ptr: NonNull, + actually_pmem: bool, + len: usize, +} + +impl Drop for PMem { + fn drop(&mut self) { + self.close() + } } - + unsafe impl Send for PMem {} unsafe impl Sync for PMem {} +#[allow(clippy::len_without_is_empty)] impl PMem { - pub fn create(filepath : &str, len: u64, mapped_len : &mut u64, is_pmem : &mut i32) -> Result { - let mut ptr = unsafe { - pmem_map_file(filepath.as_ptr() as *const i8, - len, - (PMEM_FILE_CREATE|PMEM_FILE_EXCL) as i32, - 0666, - mapped_len, - is_pmem) + /// Create a new persistent memory pool. By default a file is created which + /// is readable and writable by all users. + pub fn create>(filepath: P, len: usize) -> Result { + let mut mapped_len = 0; + let mut is_pmem = 0; + let ptr = unsafe { + pmem_map_file( + CString::new(filepath.into().to_string_lossy().into_owned())?.into_raw(), + len, + (PMEM_FILE_CREATE | PMEM_FILE_EXCL) as i32, + 0o666, + &mut mapped_len, + &mut is_pmem, + ) }; - - if ptr.is_null() { - return Err(std::io::Error::new(std::io::ErrorKind::Other, - format!("Failed to create memory pool. filepath: {}", filepath))); - } - - Ok(PMem { - ptr : ptr - }) + Self::new(ptr, mapped_len, is_pmem) } - pub fn open(filepath: &str, mapped_len: &mut u64, is_pmem: &mut i32) -> Result { - let mut ptr = unsafe { - pmem_map_file(filepath.as_ptr() as *const i8, - 0, // Opening an existing file requires no flag(s). - 0, // No length as no flag is provided. - 0666, - mapped_len, - is_pmem) + /// Open an existing persistent memory pool. + pub fn open>(filepath: P) -> Result { + let mut mapped_len = 0; + let mut is_pmem = 0; + let ptr = unsafe { + pmem_map_file( + CString::new(filepath.into().to_string_lossy().into_owned())?.into_raw(), + 0, // Opening an existing file requires no flag(s). + 0, // No length as no flag is provided. + 0o666, + &mut mapped_len, + &mut is_pmem, + ) }; + Self::new(ptr, mapped_len, is_pmem) + } - if ptr.is_null() { - return Err(std::io::Error::new(std::io::ErrorKind::Other, - format!("Failed to open the memory pool. filepath: {}", filepath))); - } + fn new(ptr: *mut c_void, len: usize, is_pmem: i32) -> Result { + NonNull::new(ptr) + .map(|valid| PMem { + ptr: valid, + actually_pmem: is_pmem != 0, + len, + }) + .ok_or_else(|| { + let err = unsafe { CString::from_raw(pmem_errormsg() as *mut i8) }; + let err_msg = format!( + "Failed to create memory pool. filepath: {}", + err.to_string_lossy() + ); + forget(err); + std::io::Error::new(std::io::ErrorKind::Other, err_msg) + }) + } - Ok(PMem { - ptr: ptr - }) + /// Read a range of bytes from the specified offset. + pub fn read(&self, offset: usize, data: &mut [u8]) { + let _ = unsafe { + pmem_memcpy( + data.as_ptr() as *mut c_void, + self.ptr.as_ptr().add(offset), + data.len(), + PMEM_F_MEM_NOFLUSH, + ) + }; } - pub fn read(&self, offset: usize, data: &mut [u8], len: u64) -> Result<(), std::io::Error>{ - if self.ptr.is_null() { - return Err(std::io::Error::new(std::io::ErrorKind::Other, - format!("File handle is missing for the PMEM file."))); - } + /// Write a range of bytes to the specified offset. + /// + /// By default, we always perform a persisting write here, equivalent to a + /// direct & sync in traditional interfaces. + /// + /// # Safety + /// It is possible to issue multiple write requests to the same area at the + /// same time. What happens then is undefined and might lead to + /// inconsistencies. + pub unsafe fn write(&self, offset: usize, data: &[u8]) { + let _ = pmem_memcpy( + self.ptr.as_ptr().add(offset), + data.as_ptr() as *mut c_void, + data.len(), + PMEM_F_MEM_NONTEMPORAL, + ); + } - let ptr = unsafe { - pmem_memcpy(data.as_ptr() as *mut c_void, self.ptr.add(offset), len, PMEM_F_MEM_NOFLUSH /*| PMEM_F_MEM_TEMPORAL*/) - }; - - if ptr.is_null() { - return Err(std::io::Error::new(std::io::ErrorKind::Other, - format!("Failed to read data from PMEM file. Offset: {}, Size: {}", offset, len))); - }; + /// Returns whether or not the underlying storage is either fsdax or devdax. + pub fn is_pmem(&self) -> bool { + self.actually_pmem + } - Ok(()) + /// The total length of the memory pool in bytes. + pub fn len(&self) -> usize { + self.len } - pub unsafe fn write(&self, offset: usize, data: &[u8], len: usize) -> Result<(), std::io::Error>{ - if self.ptr.is_null() { - return Err(std::io::Error::new(std::io::ErrorKind::Other, - format!("File handle is missing for the PMEM file."))); + fn close(&mut self) { + unsafe { + // TODO: Read out error correctly. Atleast let the output know that something went wrong. + pmem_unmap(self.ptr.as_ptr(), self.len); } + } +} - let ptr = pmem_memcpy_persist( self.ptr.add(offset), data.as_ptr() as *mut c_void, len as u64); - - if self.ptr.is_null() { - return Err(std::io::Error::new(std::io::ErrorKind::Other, - format!("Failed to write data to PMEM file. Offset: {}, Size: {}", offset, len))) - }; +#[cfg(test)] +mod tests { + use super::*; + use std::{path::PathBuf, process::Command}; + use tempfile::Builder; + + struct TestFile(PathBuf); + + impl TestFile { + pub fn new() -> Self { + TestFile( + Builder::new() + .tempfile() + .expect("Could not get tmpfile") + .path() + .to_path_buf(), + ) + } - Ok(()) + pub fn path(&self) -> &PathBuf { + &self.0 + } + } + impl Drop for TestFile { + fn drop(&mut self) { + if !Command::new("rm") + .arg(self.0.to_str().expect("Could not pass tmpfile")) + .output() + .expect("Could not delete") + .status + .success() + { + eprintln!("Could not delete tmpfile"); + } + } } - pub fn close(&self, mapped_len: &u64) { + #[test] + fn basic_io_session() { + let file = TestFile::new(); + let pmem = PMem::create(file.path(), 8 * 1024 * 1024).unwrap(); + let buf = vec![42u8; 4 * 1024 * 1024]; unsafe { - pmem_unmap(self.ptr, *mapped_len); + pmem.write(0, &buf); } + let mut rbuf = vec![0u8; buf.len()]; + pmem.read(0, &mut rbuf); + assert_eq!(rbuf, buf); } -} + #[test] + fn basic_io_persist() { + let file = TestFile::new(); + let buf = vec![42u8; 4 * 1024 * 1024]; + let offseted = [43u8]; + { + let pmem = PMem::create(file.path(), 8 * 1024 * 1024).unwrap(); + unsafe { + pmem.write(0, &buf); + pmem.write(buf.len(), &offseted); + } + } + { + let pmem = PMem::open(file.path()).unwrap(); + let mut rbuf = vec![42u8; buf.len()]; + pmem.read(0, &mut rbuf); + let mut single = [0u8]; + pmem.read(buf.len(), &mut single); + assert_eq!(rbuf, buf); + assert_eq!(single, offseted); + } + } +} diff --git a/betree/pmdk/src/main.rs b/betree/pmdk/src/main.rs deleted file mode 100644 index c5580fae..00000000 --- a/betree/pmdk/src/main.rs +++ /dev/null @@ -1,86 +0,0 @@ -use pmdk::PMem; - -const BUFFER_SIZE: usize = 4096; -const DEST_FILEPATH: &str = "/pmem0/pmempool0\0"; -const TEXT: &str = " Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec dictum, massa sit amet tempus blandit, mi purus suscipit arcu, a egestas erat orci et ipsum. Phasellus vel urna non urna cursus imperdiet. Aliquam turpis ex, maximus id tortor eget, tincidunt feugiat metus. Ut ultrices auctor massa, quis convallis lectus vulputate et. Maecenas at mi orci. Donec id leo vitae risus tempus imperdiet ut a elit. Mauris quis dolor urna. Mauris dictum enim vel turpis aliquam tincidunt. Pellentesque et eros ac quam lobortis hendrerit non ut nulla. Quisque maximus magna tristique risus lacinia, et facilisis erat molestie. - -Morbi eget sapien accumsan, rhoncus metus in, interdum libero. Nam gravida mi et viverra porttitor. Sed malesuada odio semper sapien bibendum ornare. Curabitur scelerisque lacinia ex, a rhoncus magna viverra eu. Maecenas sed libero vel ex dictum congue at sed nulla. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam erat volutpat. Proin condimentum augue eu nulla consequat efficitur. Vivamus sodales pretium erat, id iaculis risus pellentesque sit amet. Integer tempus porta diam ac facilisis. Duis ex eros, mattis nec ultrices vel, varius vel lectus. Proin varius sapien est, nec euismod ex varius nec. Quisque in sem sit amet metus scelerisque ornare at a nisi. Maecenas ac scelerisque metus. In ut velit placerat, fringilla eros non, semper risus. Cras sed ante maximus, vestibulum nunc nec, rutrum leo. \0"; -const TEXT2: &str = "hello world!"; - -fn basic_read_write_test() -{ - unsafe { - let mut src_filehandle : i32; - let mut buf = vec![0; BUFFER_SIZE]; - - let mut is_pmem : i32 = 0; - let mut mapped_len : u64 = 0; - - let mut pmem = match PMem::create(&DEST_FILEPATH, 64*1024*1024*1024, &mut mapped_len, &mut is_pmem) { - Ok(value) => value, - Err(e) => match PMem::open(&DEST_FILEPATH, &mut mapped_len, &mut is_pmem) { - Ok(value) => value, - Err(e) => panic!("\n Failed to create or open pmem file handle.") - } - }; - - - // Writing the long text (TEXT1) - let mut text_array = [0u8; BUFFER_SIZE]; - TEXT.bytes().zip(text_array.iter_mut()).for_each(|(b, ptr)| *ptr = b); - pmem.write(0, &text_array, TEXT.chars().count()); - - // Writing the short text (TEXT2) - TEXT2.bytes().zip(text_array.iter_mut()).for_each(|(b, ptr)| *ptr = b); - pmem.write(TEXT.chars().count(), &text_array, TEXT2.chars().count()); - - // Reading the long text (TEXT1) - let mut buffer = vec![0; TEXT.chars().count()]; - pmem.read(0, &mut buffer, TEXT.chars().count() as u64); - - // Reading the short text (TEXT2) - let mut buffer2 = vec![0; TEXT2.chars().count()]; - pmem.read(TEXT.chars().count(), &mut buffer2, TEXT2.chars().count() as u64); - - // Writing the long text (TEXT1) starting offset 1000 - TEXT.bytes().zip(text_array.iter_mut()).for_each(|(b, ptr)| *ptr = b); - pmem.write(1000, &text_array, TEXT.chars().count()); - - // Reading the recently text - let mut buffer3 = vec![0; TEXT.chars().count()]; - pmem.read(1000, &mut buffer3, TEXT.chars().count() as u64); - - pmem.close(&mapped_len); - - // Comparing the read text with the actual one - let read_string = match std::str::from_utf8(&buffer) { - Ok(string) => string, - Err(e) => panic!("Invalid UTF-8 sequence: {}", e), - }; - - assert_eq!(TEXT, read_string); - - let read_string2 = match std::str::from_utf8(&buffer2) { - Ok(string) => string, - Err(e) => panic!("Invalid UTF-8 sequence: {}", e), - }; - - assert_eq!(TEXT2, read_string2); - - let read_string3 = match std::str::from_utf8(&buffer3) { - Ok(string) => string, - Err(e) => panic!("Invalid UTF-8 sequence: {}", e), - }; - - assert_eq!(TEXT, read_string3); - - println!("Successfully written and read text to/from PMDK!"); - } -} - - -fn main() { - basic_read_write_test(); -} - - diff --git a/betree/src/storage_pool/configuration.rs b/betree/src/storage_pool/configuration.rs index 351e4a68..67de854f 100644 --- a/betree/src/storage_pool/configuration.rs +++ b/betree/src/storage_pool/configuration.rs @@ -121,10 +121,13 @@ pub enum Vdev { #[serde(untagged, deny_unknown_fields, rename_all = "lowercase")] pub enum LeafVdev { #[cfg(feature = "nvm")] + /// Backed by NVM with `pmdk`. PMemFile { + /// Path to the underlying file. path: PathBuf, - len: usize - }, + /// Size of backing file in bytes. + len: usize, + }, /// Backed by a file or disk. File(PathBuf), /// Customisable file vdev. @@ -226,10 +229,12 @@ impl TierConfiguration { LeafVdev::File(path) => write!(s, "{} ", path.display()).unwrap(), LeafVdev::FileWithOpts { path, direct } => { write!(s, "{} (direct: {:?}) ", path.display(), direct).unwrap() - }, + } LeafVdev::Memory { mem } => write!(s, "memory({mem}) ").unwrap(), #[cfg(feature = "nvm")] - LeafVdev::PMemFile{path, len} => write!(s, "{} {}", path.display(), len).unwrap(), + LeafVdev::PMemFile { path, len } => { + write!(s, "{} {}", path.display(), len).unwrap() + } } } } @@ -318,38 +323,42 @@ impl LeafVdev { LeafVdev::File(path) => unreachable!(), LeafVdev::FileWithOpts { .. } => unreachable!(), LeafVdev::Memory { .. } => unreachable!(), - LeafVdev::PMemFile {path, len} => (path, len), + LeafVdev::PMemFile { path, len } => (path, len), }; - let mut is_pmem : i32 = 0; - let mut mapped_len : u64 = 0; let mut file = match path.to_str() { - Some(filepath_str) => match pmdk::PMem::open(format!("{}\0",filepath_str).as_str(), &mut mapped_len, &mut is_pmem) { - Ok(handle) => handle, - Err(e) => match pmdk::PMem::create(format!("{}\0",filepath_str).as_str(), *len as u64, &mut mapped_len, &mut is_pmem) { + Some(filepath_str) => { + match pmdk::PMem::open(format!("{}\0", filepath_str).as_str()) { Ok(handle) => handle, - Err(e) => { - return Err(io::Error::new(io::ErrorKind::Other, + Err(e) => match pmdk::PMem::create( + format!("{}\0", filepath_str).as_str(), + *len, + ) { + Ok(handle) => handle, + Err(e) => { + return Err(io::Error::new(io::ErrorKind::Other, format!("Failed to create or open handle for pmem file. Path: {}", filepath_str))); - } + } + }, } - }, + } None => { - return Err(io::Error::new(io::ErrorKind::Other, - format!("Invalid file path: {:?}", path))); + return Err(io::Error::new( + io::ErrorKind::Other, + format!("Invalid file path: {:?}", path), + )); } }; - if (mapped_len != *len as u64) { - return Err(io::Error::new(io::ErrorKind::Other, - format!("The file already exists with a differnt length. Provided length: {}, File's length: {}", - len, mapped_len))); + if file.len() != *len { + return Err(io::Error::new(io::ErrorKind::Other, + format!("The file already exists with a different length. Provided length: {}, File's length: {}", + len, file.len()))); } Ok(Leaf::PMemFile(vdev::PMemFile::new( file, path.to_string_lossy().into_owned(), - mapped_len )?)) } } @@ -413,10 +422,9 @@ impl LeafVdev { writeln!(f, "{:indent$}memory({})", "", mem, indent = indent) } #[cfg(feature = "nvm")] - LeafVdev::PMemFile {path, len} => { + LeafVdev::PMemFile { path, len: _ } => { writeln!(f, "{:indent$}{}", "", path.display(), indent = indent) } - } } } diff --git a/betree/src/vdev/pmemfile.rs b/betree/src/vdev/pmemfile.rs index a53a580d..ec8e578e 100644 --- a/betree/src/vdev/pmemfile.rs +++ b/betree/src/vdev/pmemfile.rs @@ -1,22 +1,21 @@ -use pmdk; use super::{ errors::*, AtomicStatistics, Block, Result, ScrubResult, Statistics, Vdev, VdevLeafRead, VdevLeafWrite, VdevRead, }; -use crate::{buffer::Buf, buffer::BufWrite, checksum::Checksum}; +use crate::{buffer::Buf, checksum::Checksum}; use async_trait::async_trait; use libc::{c_ulong, ioctl}; +use pmdk; use std::{ fs, - io::{self, Write}, - os::unix::{ - fs::{FileExt, FileTypeExt}, + io, + os::unix:: io::AsRawFd, - }, + sync::atomic::Ordering, }; -/// `LeafVdev` that is backed by a file. +/// `LeafVdev` which is backed by NVM and uses `pmdk`. #[derive(Debug)] pub struct PMemFile { file: pmdk::PMem, @@ -27,8 +26,8 @@ pub struct PMemFile { impl PMemFile { /// Creates a new `PMEMFile`. - pub fn new(file: pmdk::PMem, id: String, len: u64) -> io::Result { - let size = Block::from_bytes(len); + pub fn new(file: pmdk::PMem, id: String) -> io::Result { + let size = Block::from_bytes(file.len() as u64); Ok(PMemFile { file, id, @@ -62,14 +61,7 @@ impl VdevRead for PMemFile { self.stats.read.fetch_add(size.as_u64(), Ordering::Relaxed); let buf = { let mut buf = Buf::zeroed(size).into_full_mut(); - - if let Err(e) = self.file.read(offset.to_bytes() as usize, buf.as_mut(), size.to_bytes() as u64) { - self.stats - .failed_reads - .fetch_add(size.as_u64(), Ordering::Relaxed); - bail!(e) - } - + self.file.read(offset.to_bytes() as usize, buf.as_mut()); buf.into_full_buf() }; @@ -101,16 +93,9 @@ impl VdevRead for PMemFile { async fn read_raw(&self, size: Block, offset: Block) -> Result> { self.stats.read.fetch_add(size.as_u64(), Ordering::Relaxed); let mut buf = Buf::zeroed(size).into_full_mut(); - - match self.file.read(offset.to_bytes() as usize, buf.as_mut(), size.to_bytes() as u64) { - Ok(()) => Ok(vec![buf.into_full_buf()]), - Err(e) => { - self.stats - .failed_reads - .fetch_add(size.as_u64(), Ordering::Relaxed); - bail!(e) - } - } + + self.file.read(offset.to_bytes() as usize, buf.as_mut()); + Ok(vec![buf.into_full_buf()]) } } @@ -147,16 +132,9 @@ impl VdevLeafRead for PMemFile { async fn read_raw + Send>(&self, mut buf: T, offset: Block) -> Result { let size = Block::from_bytes(buf.as_mut().len() as u32); self.stats.read.fetch_add(size.as_u64(), Ordering::Relaxed); - - match self.file.read(offset.to_bytes() as usize, buf.as_mut(), size.to_bytes() as u64) { - Ok(()) => Ok(buf), - Err(e) => { - self.stats - .failed_reads - .fetch_add(size.as_u64(), Ordering::Relaxed); - bail!(e) - } - } + + self.file.read(offset.to_bytes() as usize, buf.as_mut()); + Ok(buf) } fn checksum_error_occurred(&self, size: Block) { @@ -166,40 +144,22 @@ impl VdevLeafRead for PMemFile { } } -static mut cntr : u32 = 0; - #[async_trait] impl VdevLeafWrite for PMemFile { async fn write_raw + Send>( &self, data: W, offset: Block, - is_repair: bool, - ) -> Result<()> { + _is_repair: bool, + ) -> Result<()> { let block_cnt = Block::from_bytes(data.as_ref().len() as u64).as_u64(); self.stats.written.fetch_add(block_cnt, Ordering::Relaxed); - unsafe { - match self.file.write(offset.to_bytes() as usize, data.as_ref(), data.as_ref().len()) - .map_err(|_| VdevError::Write(self.id.clone())) { - Ok(()) => { - if is_repair { - self.stats.repaired.fetch_add(block_cnt, Ordering::Relaxed); - } - Ok(()) - } - Err(e) => { - self.stats - .failed_writes - .fetch_add(block_cnt, Ordering::Relaxed); - Err(e) - } - } - } + unsafe { self.file.write(offset.to_bytes() as usize, data.as_ref()) }; + Ok(()) } fn flush(&self) -> Result<()> { - //Ok(self.file.sync_data()?) Ok(()) } }