From 46db8333f1e4733c55ea7f830a18adc777fe6460 Mon Sep 17 00:00:00 2001 From: Liu Jiang Date: Sat, 2 Jan 2021 13:53:54 +0800 Subject: [PATCH 1/9] format code with cargo fmt Signed-off-by: Liu Jiang --- src/completion_queue.rs | 10 +- src/cqe.rs | 31 ++-- src/lib.rs | 95 +++++++----- src/probe.rs | 8 +- src/registrar/mod.rs | 111 ++++++++------ src/registrar/registered.rs | 18 ++- src/sqe.rs | 273 +++++++++++++++++++++-------------- src/submission_queue.rs | 40 ++--- tests/accept.rs | 2 +- tests/connect.rs | 4 +- tests/exhaust-queue.rs | 1 - tests/fileset-placeholder.rs | 6 +- tests/fixed-file-write.rs | 2 +- tests/read.rs | 9 +- tests/register-buffers.rs | 24 +-- tests/write.rs | 11 +- 16 files changed, 391 insertions(+), 254 deletions(-) diff --git a/src/completion_queue.rs b/src/completion_queue.rs index 867fb1a..ebf70d9 100644 --- a/src/completion_queue.rs +++ b/src/completion_queue.rs @@ -4,7 +4,7 @@ use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; -use super::{IoUring, CQE, CQEs, CQEsBlocking, resultify}; +use super::{resultify, CQEs, CQEsBlocking, IoUring, CQE}; /// The queue of completed IO events. /// @@ -106,9 +106,11 @@ impl<'ring> CompletionQueue<'ring> { impl fmt::Debug for CompletionQueue<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let fd = unsafe { self.ring.as_ref().ring_fd }; - f.debug_struct(std::any::type_name::()).field("fd", &fd).finish() + f.debug_struct(std::any::type_name::()) + .field("fd", &fd) + .finish() } } -unsafe impl<'ring> Send for CompletionQueue<'ring> { } -unsafe impl<'ring> Sync for CompletionQueue<'ring> { } +unsafe impl<'ring> Send for CompletionQueue<'ring> {} +unsafe impl<'ring> Sync for CompletionQueue<'ring> {} diff --git a/src/cqe.rs b/src/cqe.rs index bd97257..39a4515 100644 --- a/src/cqe.rs +++ b/src/cqe.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; -use super::{IoUring, resultify}; +use super::{resultify, IoUring}; /// A completed IO event. #[derive(Debug)] @@ -24,11 +24,16 @@ impl CQE { pub fn from_raw_parts(user_data: u64, res: i32, flags: CompletionFlags) -> CQE { CQE { - user_data, res, flags, + user_data, + res, + flags, } } - pub(crate) fn new(ring: NonNull, cqe: &mut uring_sys::io_uring_cqe) -> CQE { + pub(crate) fn new( + ring: NonNull, + cqe: &mut uring_sys::io_uring_cqe, + ) -> CQE { let user_data = cqe.user_data; let res = cqe.res; let flags = CompletionFlags::from_bits_truncate(cqe.flags); @@ -61,8 +66,8 @@ impl CQE { } } -unsafe impl Send for CQE { } -unsafe impl Sync for CQE { } +unsafe impl Send for CQE {} +unsafe impl Sync for CQE {} /// An iterator of [`CQE`]s from the [`CompletionQueue`](crate::CompletionQueue). /// @@ -75,7 +80,11 @@ pub struct CQEs<'a> { impl<'a> CQEs<'a> { pub(crate) fn new(ring: NonNull) -> CQEs<'a> { - CQEs { ring, ready: 0, marker: PhantomData } + CQEs { + ring, + ready: 0, + marker: PhantomData, + } } #[inline(always)] @@ -114,7 +123,6 @@ impl Iterator for CQEs<'_> { } } - /// An iterator of [`CQE`]s from the [`CompletionQueue`](crate::CompletionQueue). /// /// This iterator will never be exhausted; if there are no `CQE`s ready, it will block until there @@ -128,7 +136,12 @@ pub struct CQEsBlocking<'a> { impl<'a> CQEsBlocking<'a> { pub(crate) fn new(ring: NonNull, wait_for: u32) -> CQEsBlocking<'a> { - CQEsBlocking { ring, ready: 0, wait_for, marker: PhantomData } + CQEsBlocking { + ring, + ready: 0, + wait_for, + marker: PhantomData, + } } #[inline(always)] @@ -176,7 +189,7 @@ impl Iterator for CQEsBlocking<'_> { self.ready = self.ready(); if self.ready == 0 { let ring = self.ring; - return Some(self.wait().map(|cqe| CQE::new(ring, cqe))) + return Some(self.wait().map(|cqe| CQE::new(ring, cqe))); } } diff --git a/src/lib.rs b/src/lib.rs index c9638d5..d298f04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,16 +60,16 @@ use std::ptr::{self, NonNull}; use std::time::Duration; #[doc(inline)] -pub use sqe::{SQE, SQEs}; +pub use cqe::{CQEs, CQEsBlocking, CQE}; #[doc(inline)] -pub use cqe::{CQE, CQEs, CQEsBlocking}; +pub use sqe::{SQEs, SQE}; pub use completion_queue::CompletionQueue; pub use submission_queue::SubmissionQueue; pub use probe::Probe; #[doc(inline)] -pub use registrar::{Registrar, Personality}; +pub use registrar::{Personality, Registrar}; bitflags::bitflags! { /// [`IoUring`] initialization flags for advanced use cases. @@ -180,18 +180,24 @@ impl IoUring { /// Creates a new `IoUring` using a set of `SetupFlags` and `SetupFeatures` for advanced /// use cases. - pub fn new_with_flags(entries: u32, flags: SetupFlags, features: SetupFeatures) -> io::Result { + pub fn new_with_flags( + entries: u32, + flags: SetupFlags, + features: SetupFeatures, + ) -> io::Result { unsafe { let mut params: uring_sys::io_uring_params = mem::zeroed(); params.flags = flags.bits(); params.features = features.bits(); let mut ring = MaybeUninit::uninit(); resultify(uring_sys::io_uring_queue_init_params( - entries as _, - ring.as_mut_ptr(), - &mut params, + entries as _, + ring.as_mut_ptr(), + &mut params, ))?; - Ok(IoUring { ring: ring.assume_init() }) + Ok(IoUring { + ring: ring.assume_init(), + }) } } @@ -212,7 +218,11 @@ impl IoUring { /// Returns the three constituent parts of the `IoUring`. pub fn queues(&mut self) -> (SubmissionQueue<'_>, CompletionQueue<'_>, Registrar<'_>) { - (SubmissionQueue::new(&*self), CompletionQueue::new(&*self), Registrar::new(&*self)) + ( + SubmissionQueue::new(&*self), + CompletionQueue::new(&*self), + Registrar::new(&*self), + ) } pub fn probe(&mut self) -> io::Result { @@ -221,18 +231,14 @@ impl IoUring { /// Returns the next [`SQE`] which can be prepared to submit. pub fn prepare_sqe(&mut self) -> Option> { - unsafe { - submission_queue::prepare_sqe(&mut self.ring) - } + unsafe { submission_queue::prepare_sqe(&mut self.ring) } } /// Returns the next `count` [`SQE`]s which can be prepared to submit as an iterator. /// /// See the [`SQEs`] type for more information about how these multiple SQEs can be used. pub fn prepare_sqes(&mut self, count: u32) -> Option> { - unsafe { - submission_queue::prepare_sqes(&mut self.ring.sq, count) - } + unsafe { submission_queue::prepare_sqes(&mut self.ring.sq, count) } } /// Submit all prepared [`SQE`]s to the kernel. @@ -246,12 +252,13 @@ impl IoUring { self.sq().submit_and_wait(wait_for) } - /// Submit all prepared [`SQE`]s to the kernel and wait until at least `wait_for` events have /// completed or `duration` has passed. - pub fn submit_sqes_and_wait_with_timeout(&mut self, wait_for: u32, duration: Duration) - -> io::Result - { + pub fn submit_sqes_and_wait_with_timeout( + &mut self, + wait_for: u32, + duration: Duration, + ) -> io::Result { self.sq().submit_and_wait_with_timeout(wait_for, duration) } @@ -273,20 +280,20 @@ impl IoUring { /// Block until at least one [`CQE`] is completed. This will consume that CQE. pub fn wait_for_cqe(&mut self) -> io::Result { let ring = NonNull::from(&self.ring); - self.inner_wait_for_cqes(1, ptr::null()).map(|cqe| CQE::new(ring, cqe)) + self.inner_wait_for_cqes(1, ptr::null()) + .map(|cqe| CQE::new(ring, cqe)) } /// Block until a [`CQE`] is ready or timeout. - pub fn wait_for_cqe_with_timeout(&mut self, duration: Duration) - -> io::Result - { + pub fn wait_for_cqe_with_timeout(&mut self, duration: Duration) -> io::Result { let ts = uring_sys::__kernel_timespec { tv_sec: duration.as_secs() as _, - tv_nsec: duration.subsec_nanos() as _ + tv_nsec: duration.subsec_nanos() as _, }; let ring = NonNull::from(&self.ring); - self.inner_wait_for_cqes(1, &ts).map(|cqe| CQE::new(ring, cqe)) + self.inner_wait_for_cqes(1, &ts) + .map(|cqe| CQE::new(ring, cqe)) } /// Returns an iterator of [`CQE`]s which are ready from the kernel. @@ -305,12 +312,15 @@ impl IoUring { /// Wait until `count` [`CQE`]s are ready, without submitting any events. pub fn wait_for_cqes(&mut self, count: u32) -> io::Result<()> { - self.inner_wait_for_cqes(count as _, ptr::null()).map(|_| ()) + self.inner_wait_for_cqes(count as _, ptr::null()) + .map(|_| ()) } - fn inner_wait_for_cqes(&mut self, count: u32, ts: *const uring_sys::__kernel_timespec) - -> io::Result<&mut uring_sys::io_uring_cqe> - { + fn inner_wait_for_cqes( + &mut self, + count: u32, + ts: *const uring_sys::__kernel_timespec, + ) -> io::Result<&mut uring_sys::io_uring_cqe> { unsafe { let mut cqe = MaybeUninit::uninit(); @@ -361,7 +371,9 @@ impl IoUring { impl fmt::Debug for IoUring { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct(std::any::type_name::()).field("fd", &self.ring.ring_fd).finish() + f.debug_struct(std::any::type_name::()) + .field("fd", &self.ring.ring_fd) + .finish() } } @@ -371,13 +383,13 @@ impl Drop for IoUring { } } -unsafe impl Send for IoUring { } -unsafe impl Sync for IoUring { } +unsafe impl Send for IoUring {} +unsafe impl Sync for IoUring {} fn resultify(x: i32) -> io::Result { match x >= 0 { - true => Ok(x as u32), - false => Err(io::Error::from_raw_os_error(-x)), + true => Ok(x as u32), + false => Err(io::Error::from_raw_os_error(-x)), } } @@ -394,17 +406,26 @@ mod tests { let mut calls = 0; let ret = resultify(side_effect(0, &mut calls)); - assert!(match ret { Ok(0) => true, _ => false }); + assert!(match ret { + Ok(0) => true, + _ => false, + }); assert_eq!(calls, 1); calls = 0; let ret = resultify(side_effect(1, &mut calls)); - assert!(match ret { Ok(1) => true, _ => false }); + assert!(match ret { + Ok(1) => true, + _ => false, + }); assert_eq!(calls, 1); calls = 0; let ret = resultify(side_effect(-1, &mut calls)); - assert!(match ret { Err(e) if e.raw_os_error() == Some(1) => true, _ => false }); + assert!(match ret { + Err(e) if e.raw_os_error() == Some(1) => true, + _ => false, + }); assert_eq!(calls, 1); } } diff --git a/src/probe.rs b/src/probe.rs index 319dc16..5735dec 100644 --- a/src/probe.rs +++ b/src/probe.rs @@ -11,14 +11,18 @@ impl Probe { pub fn new() -> io::Result { unsafe { let probe = uring_sys::io_uring_get_probe(); - NonNull::new(probe).ok_or_else(io::Error::last_os_error).map(|probe| Probe { probe }) + NonNull::new(probe) + .ok_or_else(io::Error::last_os_error) + .map(|probe| Probe { probe }) } } pub(crate) fn for_ring(ring: *mut uring_sys::io_uring) -> io::Result { unsafe { let probe = uring_sys::io_uring_get_probe_ring(ring); - NonNull::new(probe).ok_or_else(io::Error::last_os_error).map(|probe| Probe { probe }) + NonNull::new(probe) + .ok_or_else(io::Error::last_os_error) + .map(|probe| Probe { probe }) } } diff --git a/src/registrar/mod.rs b/src/registrar/mod.rs index 1c31fbb..3eab6a4 100644 --- a/src/registrar/mod.rs +++ b/src/registrar/mod.rs @@ -14,10 +14,10 @@ mod registered; use std::fmt; use std::io; use std::marker::PhantomData; -use std::ptr::NonNull; use std::os::unix::io::RawFd; +use std::ptr::NonNull; -use crate::{IoUring, Probe, resultify}; +use crate::{resultify, IoUring, Probe}; pub use registered::*; @@ -57,9 +57,10 @@ impl<'ring> Registrar<'ring> { } } - pub fn register_buffers(&self, buffers: Vec>) - -> io::Result> - { + pub fn register_buffers( + &self, + buffers: Vec>, + ) -> io::Result> { let len = buffers.len(); let addr = buffers.as_ptr() as *const _; resultify(unsafe { @@ -68,13 +69,13 @@ impl<'ring> Registrar<'ring> { Ok(buffers .into_iter() .enumerate() - .map(|(i, buf)| RegisteredBuf::new(i as u32, buf)) - ) + .map(|(i, buf)| RegisteredBuf::new(i as u32, buf))) } - pub fn register_buffers_by_ref<'a>(&self, buffers: &'a [&'a [u8]]) - -> io::Result> + 'a> - { + pub fn register_buffers_by_ref<'a>( + &self, + buffers: &'a [&'a [u8]], + ) -> io::Result> + 'a> { let len = buffers.len(); let addr = buffers.as_ptr() as *const _; resultify(unsafe { @@ -83,13 +84,13 @@ impl<'ring> Registrar<'ring> { Ok(buffers .iter() .enumerate() - .map(|(i, buf)| Registered::new(i as u32, &**buf)) - ) + .map(|(i, buf)| Registered::new(i as u32, &**buf))) } - pub fn register_buffers_by_mut<'a>(&self, buffers: &'a mut [&'a mut [u8]]) - -> io::Result> + 'a> - { + pub fn register_buffers_by_mut<'a>( + &self, + buffers: &'a mut [&'a mut [u8]], + ) -> io::Result> + 'a> { let len = buffers.len(); let addr = buffers.as_ptr() as *const _; resultify(unsafe { @@ -98,22 +99,19 @@ impl<'ring> Registrar<'ring> { Ok(buffers .iter_mut() .enumerate() - .map(|(i, buf)| Registered::new(i as u32, &mut **buf)) - ) + .map(|(i, buf)| Registered::new(i as u32, &mut **buf))) } /// Unregister all currently registered buffers. An explicit call to this method is often unecessary, /// because all buffers will be unregistered automatically when the ring is dropped. pub fn unregister_buffers(&self) -> io::Result<()> { - resultify(unsafe { - uring_sys::io_uring_unregister_buffers(self.ring.as_ptr()) - })?; + resultify(unsafe { uring_sys::io_uring_unregister_buffers(self.ring.as_ptr()) })?; Ok(()) } - /// Register a set of files with the kernel. Registered files handle kernel fileset indexing + /// Register a set of files with the kernel. Registered files handle kernel fileset indexing /// behind the scenes and can often be used in place of raw file descriptors. - /// + /// /// # Errors /// Returns an error if /// * there is a preexisting set of registered files, @@ -134,20 +132,22 @@ impl<'ring> Registrar<'ring> { /// # Ok(()) /// # } /// ``` - pub fn register_files<'a>(&self, files: &'a [RawFd]) -> io::Result + 'a> { + pub fn register_files<'a>( + &self, + files: &'a [RawFd], + ) -> io::Result + 'a> { assert!(files.len() <= u32::MAX as usize); resultify(unsafe { uring_sys::io_uring_register_files( - self.ring.as_ptr(), - files.as_ptr() as *const _, - files.len() as _ + self.ring.as_ptr(), + files.as_ptr() as *const _, + files.len() as _, ) })?; Ok(files .iter() .enumerate() - .map(|(i, &fd)| RegisteredFd::new(i as u32, fd)) - ) + .map(|(i, &fd)| RegisteredFd::new(i as u32, fd))) } /// Update the currently registered kernel fileset. It is usually more efficient to reserve space @@ -157,11 +157,15 @@ impl<'ring> Registrar<'ring> { /// Returns an error if /// * there isn't a registered fileset, /// * the `files` slice was empty, - /// * `offset` is out of bounds, + /// * `offset` is out of bounds, /// * the `files` slice was too large, /// * the inner [`io_uring_register_files_update`](uring_sys::io_uring_register_files_update) call /// failed for another reason - pub fn update_registered_files<'a>(&mut self, offset: usize, files: &'a [RawFd]) -> io::Result + 'a> { + pub fn update_registered_files<'a>( + &mut self, + offset: usize, + files: &'a [RawFd], + ) -> io::Result + 'a> { assert!(files.len() + offset <= u32::MAX as usize); resultify(unsafe { uring_sys::io_uring_register_files_update( @@ -174,8 +178,7 @@ impl<'ring> Registrar<'ring> { Ok(files .iter() .enumerate() - .map(move |(i, &fd)| RegisteredFd::new((i + offset) as u32, fd)) - ) + .map(move |(i, &fd)| RegisteredFd::new((i + offset) as u32, fd))) } /// Unregister all currently registered files. An explicit call to this method is often unecessary, @@ -211,9 +214,7 @@ impl<'ring> Registrar<'ring> { } pub fn register_eventfd(&self, eventfd: RawFd) -> io::Result<()> { - resultify(unsafe { - uring_sys::io_uring_register_eventfd(self.ring.as_ptr(), eventfd) - })?; + resultify(unsafe { uring_sys::io_uring_register_eventfd(self.ring.as_ptr(), eventfd) })?; Ok(()) } @@ -225,14 +226,13 @@ impl<'ring> Registrar<'ring> { } pub fn unregister_eventfd(&self) -> io::Result<()> { - resultify(unsafe { - uring_sys::io_uring_unregister_eventfd(self.ring.as_ptr()) - })?; + resultify(unsafe { uring_sys::io_uring_unregister_eventfd(self.ring.as_ptr()) })?; Ok(()) } pub fn register_personality(&self) -> io::Result { - let id = resultify(unsafe { uring_sys::io_uring_register_personality(self.ring.as_ptr()) })?; + let id = + resultify(unsafe { uring_sys::io_uring_register_personality(self.ring.as_ptr()) })?; debug_assert!(id < u16::MAX as u32); Ok(Personality { id: id as u16 }) } @@ -252,12 +252,14 @@ impl<'ring> Registrar<'ring> { impl fmt::Debug for Registrar<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let fd = unsafe { self.ring.as_ref().ring_fd }; - f.debug_struct(std::any::type_name::()).field("fd", &fd).finish() + f.debug_struct(std::any::type_name::()) + .field("fd", &fd) + .finish() } } -unsafe impl<'ring> Send for Registrar<'ring> { } -unsafe impl<'ring> Sync for Registrar<'ring> { } +unsafe impl<'ring> Send for Registrar<'ring> {} +unsafe impl<'ring> Sync for Registrar<'ring> {} #[derive(Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Clone, Copy)] pub struct Personality { @@ -318,7 +320,10 @@ mod tests { let raw_fds = [1, 2]; let ring = IoUring::new(1).unwrap(); let _ = ring.registrar().register_files(&raw_fds).unwrap(); - let _ = ring.registrar().update_registered_files(2, &raw_fds).unwrap(); + let _ = ring + .registrar() + .update_registered_files(2, &raw_fds) + .unwrap(); } #[test] @@ -326,7 +331,10 @@ mod tests { fn slice_len_out_of_bounds_update() { let ring = IoUring::new(1).unwrap(); let _ = ring.registrar().register_files(&[1, 1]).unwrap(); - let _ = ring.registrar().update_registered_files(0, &[1, 1, 1]).unwrap(); + let _ = ring + .registrar() + .update_registered_files(0, &[1, 1, 1]) + .unwrap(); } #[test] @@ -334,10 +342,16 @@ mod tests { let ring = IoUring::new(1).unwrap(); let file = std::fs::File::create("tmp.txt").unwrap(); - let _ = ring.registrar().register_files(&[file.as_raw_fd()]).unwrap(); + let _ = ring + .registrar() + .register_files(&[file.as_raw_fd()]) + .unwrap(); let new_file = std::fs::File::create("new_tmp.txt").unwrap(); - let _ = ring.registrar().update_registered_files(0, &[new_file.as_raw_fd()]).unwrap(); + let _ = ring + .registrar() + .update_registered_files(0, &[new_file.as_raw_fd()]) + .unwrap(); let _ = std::fs::remove_file("tmp.txt"); let _ = std::fs::remove_file("new_tmp.txt"); @@ -349,7 +363,10 @@ mod tests { let _ = ring.registrar().register_files(&[-1, -1, -1]).unwrap(); let file = std::fs::File::create("tmp.txt").unwrap(); - let _ = ring.registrar().update_registered_files(0, &[file.as_raw_fd()]).unwrap(); + let _ = ring + .registrar() + .update_registered_files(0, &[file.as_raw_fd()]) + .unwrap(); let _ = std::fs::remove_file("tmp.txt"); } } diff --git a/src/registrar/registered.rs b/src/registrar/registered.rs index fab0a1f..b3b7c02 100644 --- a/src/registrar/registered.rs +++ b/src/registrar/registered.rs @@ -15,10 +15,10 @@ pub const PLACEHOLDER_FD: RawFd = -1; /// /// Submission event prep methods on `RegisteredFd` will ensure that the submission event's /// `SubmissionFlags::FIXED_FILE` flag is properly set. -pub type RegisteredFd = Registered; -pub type RegisteredBuf = Registered>; -pub type RegisteredBufRef<'a> = Registered<&'a [u8]>; -pub type RegisteredBufMut<'a> = Registered<&'a mut [u8]>; +pub type RegisteredFd = Registered; +pub type RegisteredBuf = Registered>; +pub type RegisteredBufRef<'a> = Registered<&'a [u8]>; +pub type RegisteredBufMut<'a> = Registered<&'a mut [u8]>; /// An object registered with an io-uring instance through a [`Registrar`](crate::Registrar). #[derive(Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] @@ -188,7 +188,7 @@ impl UringFd for RawFd { *self } - fn update_sqe(&self, _: &mut SQE<'_>) { } + fn update_sqe(&self, _: &mut SQE<'_>) {} } impl UringFd for RegisteredFd { @@ -197,7 +197,9 @@ impl UringFd for RegisteredFd { } fn update_sqe(&self, sqe: &mut SQE<'_>) { - unsafe { sqe.raw_mut().fd = self.index as RawFd; } + unsafe { + sqe.raw_mut().fd = self.index as RawFd; + } sqe.set_fixed_file(); } } @@ -220,7 +222,7 @@ impl UringReadBuf for RegisteredBufMut<'_> { self.data.as_mut_ptr() as _, self.data.len() as _, offset as _, - self.index() as _ + self.index() as _, ); fd.update_sqe(sqe); } @@ -286,7 +288,7 @@ impl UringWriteBuf for RegisteredBufRef<'_> { self.data.as_ptr() as _, self.data.len() as _, offset as _, - self.index() as _ + self.index() as _, ); fd.update_sqe(sqe); } diff --git a/src/sqe.rs b/src/sqe.rs index 7b6f70a..9a54271 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -1,6 +1,6 @@ +use std::ffi::CStr; use std::io; use std::mem; -use std::ffi::CStr; use std::ops::{Deref, DerefMut}; use std::os::unix::io::RawFd; use std::ptr; @@ -8,12 +8,12 @@ use std::slice; use crate::registrar::{UringFd, UringReadBuf, UringWriteBuf}; -pub use nix::fcntl::{OFlag, FallocateFlags, PosixFadviseAdvice}; +pub use nix::fcntl::{FallocateFlags, OFlag, PosixFadviseAdvice}; pub use nix::poll::PollFlags; -pub use nix::sys::epoll::{EpollOp, EpollEvent}; +pub use nix::sys::epoll::{EpollEvent, EpollOp}; pub use nix::sys::mman::MmapAdvise; +pub use nix::sys::socket::{MsgFlags, SockAddr, SockFlag}; pub use nix::sys::stat::Mode; -pub use nix::sys::socket::{SockAddr, SockFlag, MsgFlags}; use crate::Personality; @@ -50,7 +50,7 @@ impl<'a> SQE<'a> { /// `SQE` may impose additional safety invariants which you must adhere to /// when setting the user_data for a submission queue event, which it may rely on when /// processing the corresponding completion queue event. For example, the library - /// [ringbahn][ringbahn] + /// [ringbahn][ringbahn] /// /// # Example /// @@ -108,12 +108,7 @@ impl<'a> SQE<'a> { /// Both the file descriptor and the buffer can be pre-registered. See the /// [`registrar][crate::registrar] module for more information. #[inline] - pub unsafe fn prep_read( - &mut self, - fd: impl UringFd, - buf: impl UringReadBuf, - offset: u64, - ) { + pub unsafe fn prep_read(&mut self, fd: impl UringFd, buf: impl UringReadBuf, offset: u64) { buf.prep_read(fd, self, offset); } @@ -142,12 +137,14 @@ impl<'a> SQE<'a> { ) { let len = buf.len(); let addr = buf.as_mut_ptr(); - uring_sys::io_uring_prep_read_fixed(self.sqe, - fd.as_raw_fd(), - addr as _, - len as _, - offset as _, - buf_index as _); + uring_sys::io_uring_prep_read_fixed( + self.sqe, + fd.as_raw_fd(), + addr as _, + len as _, + offset as _, + buf_index as _, + ); fd.update_sqe(self); } @@ -156,12 +153,7 @@ impl<'a> SQE<'a> { /// Both the file descriptor and the buffer can be pre-registered. See the /// [`registrar][crate::registrar] module for more information. #[inline] - pub unsafe fn prep_write( - &mut self, - fd: impl UringFd, - buf: impl UringWriteBuf, - offset: u64, - ) { + pub unsafe fn prep_write(&mut self, fd: impl UringFd, buf: impl UringWriteBuf, offset: u64) { buf.prep_write(fd, self, offset) } @@ -175,11 +167,7 @@ impl<'a> SQE<'a> { ) { let len = bufs.len(); let addr = bufs.as_ptr(); - uring_sys::io_uring_prep_writev(self.sqe, - fd.as_raw_fd(), - addr as _, - len as _, - offset as _); + uring_sys::io_uring_prep_writev(self.sqe, fd.as_raw_fd(), addr as _, len as _, offset as _); fd.update_sqe(self); } @@ -194,12 +182,14 @@ impl<'a> SQE<'a> { ) { let len = buf.len(); let addr = buf.as_ptr(); - uring_sys::io_uring_prep_write_fixed(self.sqe, - fd.as_raw_fd(), - addr as _, - len as _, - offset as _, - buf_index as _); + uring_sys::io_uring_prep_write_fixed( + self.sqe, + fd.as_raw_fd(), + addr as _, + len as _, + offset as _, + buf_index as _, + ); fd.update_sqe(self); } @@ -221,7 +211,15 @@ impl<'a> SQE<'a> { count: u32, flags: SpliceFlags, ) { - uring_sys::io_uring_prep_splice(self.sqe, fd_in, off_in, fd_out, off_out, count, flags.bits()); + uring_sys::io_uring_prep_splice( + self.sqe, + fd_in, + off_in, + fd_out, + off_out, + count, + flags.bits(), + ); } /// Prepare a recv event on a file descriptor. @@ -243,26 +241,43 @@ impl<'a> SQE<'a> { } /// Prepare a recvmsg event on a file descriptor. - pub unsafe fn prep_recvmsg(&mut self, fd: impl UringFd, msg: *mut libc::msghdr, flags: MsgFlags) { + pub unsafe fn prep_recvmsg( + &mut self, + fd: impl UringFd, + msg: *mut libc::msghdr, + flags: MsgFlags, + ) { uring_sys::io_uring_prep_recvmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _); fd.update_sqe(self); } /// Prepare a sendmsg event on a file descriptor. - pub unsafe fn prep_sendmsg(&mut self, fd: impl UringFd, msg: *mut libc::msghdr, flags: MsgFlags) { + pub unsafe fn prep_sendmsg( + &mut self, + fd: impl UringFd, + msg: *mut libc::msghdr, + flags: MsgFlags, + ) { uring_sys::io_uring_prep_sendmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _); fd.update_sqe(self); } /// Prepare a fallocate event. #[inline] - pub unsafe fn prep_fallocate(&mut self, fd: impl UringFd, - offset: u64, size: u64, - flags: FallocateFlags) { - uring_sys::io_uring_prep_fallocate(self.sqe, fd.as_raw_fd(), - flags.bits() as _, - offset as _, - size as _); + pub unsafe fn prep_fallocate( + &mut self, + fd: impl UringFd, + offset: u64, + size: u64, + flags: FallocateFlags, + ) { + uring_sys::io_uring_prep_fallocate( + self.sqe, + fd.as_raw_fd(), + flags.bits() as _, + offset as _, + size as _, + ); fd.update_sqe(self); } @@ -276,21 +291,26 @@ impl<'a> SQE<'a> { mask: StatxMode, buf: &mut libc::statx, ) { - uring_sys::io_uring_prep_statx(self.sqe, dirfd.as_raw_fd(), path.as_ptr() as _, - flags.bits() as _, mask.bits() as _, - buf as _); + uring_sys::io_uring_prep_statx( + self.sqe, + dirfd.as_raw_fd(), + path.as_ptr() as _, + flags.bits() as _, + mask.bits() as _, + buf as _, + ); } /// Prepare an openat event. #[inline] - pub unsafe fn prep_openat( - &mut self, - fd: impl UringFd, - path: &CStr, - flags: OFlag, - mode: Mode, - ) { - uring_sys::io_uring_prep_openat(self.sqe, fd.as_raw_fd(), path.as_ptr() as _, flags.bits(), mode.bits()); + pub unsafe fn prep_openat(&mut self, fd: impl UringFd, path: &CStr, flags: OFlag, mode: Mode) { + uring_sys::io_uring_prep_openat( + self.sqe, + fd.as_raw_fd(), + path.as_ptr() as _, + flags.bits(), + mode.bits(), + ); } // TODO openat2 @@ -301,7 +321,6 @@ impl<'a> SQE<'a> { uring_sys::io_uring_prep_close(self.sqe, fd.as_raw_fd()); } - /// Prepare a timeout event. /// /// ``` @@ -324,11 +343,18 @@ impl<'a> SQE<'a> { /// # } ///``` #[inline] - pub unsafe fn prep_timeout(&mut self, ts: &uring_sys::__kernel_timespec, events: u32, flags: TimeoutFlags) { - uring_sys::io_uring_prep_timeout(self.sqe, - ts as *const _ as *mut _, - events as _, - flags.bits() as _); + pub unsafe fn prep_timeout( + &mut self, + ts: &uring_sys::__kernel_timespec, + events: u32, + flags: TimeoutFlags, + ) { + uring_sys::io_uring_prep_timeout( + self.sqe, + ts as *const _ as *mut _, + events as _, + flags.bits() as _, + ); } #[inline] @@ -360,25 +386,39 @@ impl<'a> SQE<'a> { } #[inline] - pub unsafe fn prep_accept(&mut self, fd: impl UringFd, accept: Option<&mut SockAddrStorage>, flags: SockFlag) { + pub unsafe fn prep_accept( + &mut self, + fd: impl UringFd, + accept: Option<&mut SockAddrStorage>, + flags: SockFlag, + ) { let (addr, len) = match accept { - Some(accept) => (accept.storage.as_mut_ptr() as *mut _, &mut accept.len as *mut _ as *mut _), - None => (std::ptr::null_mut(), std::ptr::null_mut()) + Some(accept) => ( + accept.storage.as_mut_ptr() as *mut _, + &mut accept.len as *mut _ as *mut _, + ), + None => (std::ptr::null_mut(), std::ptr::null_mut()), }; uring_sys::io_uring_prep_accept(self.sqe, fd.as_raw_fd(), addr, len, flags.bits()); fd.update_sqe(self); } #[inline] - pub unsafe fn prep_fadvise(&mut self, fd: impl UringFd, off: u64, len: u64, advice: PosixFadviseAdvice) { + pub unsafe fn prep_fadvise( + &mut self, + fd: impl UringFd, + off: u64, + len: u64, + advice: PosixFadviseAdvice, + ) { use PosixFadviseAdvice::*; let advice = match advice { - POSIX_FADV_NORMAL => libc::POSIX_FADV_NORMAL, - POSIX_FADV_SEQUENTIAL => libc::POSIX_FADV_SEQUENTIAL, - POSIX_FADV_RANDOM => libc::POSIX_FADV_RANDOM, - POSIX_FADV_NOREUSE => libc::POSIX_FADV_NOREUSE, - POSIX_FADV_WILLNEED => libc::POSIX_FADV_WILLNEED, - POSIX_FADV_DONTNEED => libc::POSIX_FADV_DONTNEED, + POSIX_FADV_NORMAL => libc::POSIX_FADV_NORMAL, + POSIX_FADV_SEQUENTIAL => libc::POSIX_FADV_SEQUENTIAL, + POSIX_FADV_RANDOM => libc::POSIX_FADV_RANDOM, + POSIX_FADV_NOREUSE => libc::POSIX_FADV_NOREUSE, + POSIX_FADV_WILLNEED => libc::POSIX_FADV_WILLNEED, + POSIX_FADV_DONTNEED => libc::POSIX_FADV_DONTNEED, }; uring_sys::io_uring_prep_fadvise(self.sqe, fd.as_raw_fd(), off as _, len as _, advice); fd.update_sqe(self); @@ -388,33 +428,44 @@ impl<'a> SQE<'a> { pub unsafe fn prep_madvise(&mut self, data: &mut [u8], advice: MmapAdvise) { use MmapAdvise::*; let advice = match advice { - MADV_NORMAL => libc::MADV_NORMAL, - MADV_RANDOM => libc::MADV_RANDOM, - MADV_SEQUENTIAL => libc::MADV_SEQUENTIAL, - MADV_WILLNEED => libc::MADV_WILLNEED, - MADV_DONTNEED => libc::MADV_DONTNEED, - MADV_REMOVE => libc::MADV_REMOVE, - MADV_DONTFORK => libc::MADV_DONTFORK, - MADV_DOFORK => libc::MADV_DOFORK, - MADV_HWPOISON => libc::MADV_HWPOISON, - MADV_MERGEABLE => libc::MADV_MERGEABLE, - MADV_UNMERGEABLE => libc::MADV_UNMERGEABLE, - MADV_SOFT_OFFLINE => libc::MADV_SOFT_OFFLINE, - MADV_HUGEPAGE => libc::MADV_HUGEPAGE, - MADV_NOHUGEPAGE => libc::MADV_NOHUGEPAGE, - MADV_DONTDUMP => libc::MADV_DONTDUMP, - MADV_DODUMP => libc::MADV_DODUMP, - MADV_FREE => libc::MADV_FREE, + MADV_NORMAL => libc::MADV_NORMAL, + MADV_RANDOM => libc::MADV_RANDOM, + MADV_SEQUENTIAL => libc::MADV_SEQUENTIAL, + MADV_WILLNEED => libc::MADV_WILLNEED, + MADV_DONTNEED => libc::MADV_DONTNEED, + MADV_REMOVE => libc::MADV_REMOVE, + MADV_DONTFORK => libc::MADV_DONTFORK, + MADV_DOFORK => libc::MADV_DOFORK, + MADV_HWPOISON => libc::MADV_HWPOISON, + MADV_MERGEABLE => libc::MADV_MERGEABLE, + MADV_UNMERGEABLE => libc::MADV_UNMERGEABLE, + MADV_SOFT_OFFLINE => libc::MADV_SOFT_OFFLINE, + MADV_HUGEPAGE => libc::MADV_HUGEPAGE, + MADV_NOHUGEPAGE => libc::MADV_NOHUGEPAGE, + MADV_DONTDUMP => libc::MADV_DONTDUMP, + MADV_DODUMP => libc::MADV_DODUMP, + MADV_FREE => libc::MADV_FREE, }; - uring_sys::io_uring_prep_madvise(self.sqe, data.as_mut_ptr() as *mut _, data.len() as _, advice); + uring_sys::io_uring_prep_madvise( + self.sqe, + data.as_mut_ptr() as *mut _, + data.len() as _, + advice, + ); } #[inline] - pub unsafe fn prep_epoll_ctl(&mut self, epoll_fd: RawFd, op: EpollOp, fd: RawFd, event: Option<&mut EpollEvent>) { + pub unsafe fn prep_epoll_ctl( + &mut self, + epoll_fd: RawFd, + op: EpollOp, + fd: RawFd, + event: Option<&mut EpollEvent>, + ) { let op = match op { - EpollOp::EpollCtlAdd => libc::EPOLL_CTL_ADD, - EpollOp::EpollCtlDel => libc::EPOLL_CTL_DEL, - EpollOp::EpollCtlMod => libc::EPOLL_CTL_MOD, + EpollOp::EpollCtlAdd => libc::EPOLL_CTL_ADD, + EpollOp::EpollCtlDel => libc::EPOLL_CTL_DEL, + EpollOp::EpollCtlMod => libc::EPOLL_CTL_MOD, }; let event = event.map_or(ptr::null_mut(), |event| event as *mut EpollEvent as *mut _); uring_sys::io_uring_prep_epoll_ctl(self.sqe, epoll_fd, fd, op, event); @@ -427,7 +478,8 @@ impl<'a> SQE<'a> { uring_sys::io_uring_prep_files_update(self.sqe, addr, len, offset as _); } - pub unsafe fn prep_provide_buffers(&mut self, + pub unsafe fn prep_provide_buffers( + &mut self, buffers: &mut [u8], count: u32, group: BufferGroupId, @@ -435,7 +487,14 @@ impl<'a> SQE<'a> { ) { let addr = buffers.as_mut_ptr() as *mut libc::c_void; let len = buffers.len() as u32 / count; - uring_sys::io_uring_prep_provide_buffers(self.sqe, addr, len as _, count as _, group.id as _, index as _); + uring_sys::io_uring_prep_provide_buffers( + self.sqe, + addr, + len as _, + count as _, + group.id as _, + index as _, + ); } pub unsafe fn prep_remove_buffers(&mut self, count: u32, id: BufferGroupId) { @@ -447,7 +506,6 @@ impl<'a> SQE<'a> { uring_sys::io_uring_prep_cancel(self.sqe, user_data as _, flags); } - /// Prepare a no-op event. /// ``` /// # use iou::{IoUring, sqe::SubmissionFlags}; @@ -520,8 +578,8 @@ impl<'a> SQE<'a> { } } -unsafe impl<'a> Send for SQE<'a> { } -unsafe impl<'a> Sync for SQE<'a> { } +unsafe impl<'a> Send for SQE<'a> {} +unsafe impl<'a> Sync for SQE<'a> {} #[derive(Debug)] pub struct SockAddrStorage { @@ -533,10 +591,7 @@ impl SockAddrStorage { pub fn uninit() -> Self { let storage = mem::MaybeUninit::uninit(); let len = mem::size_of::(); - SockAddrStorage { - storage, - len - } + SockAddrStorage { storage, len } } pub unsafe fn as_socket_addr(&self) -> io::Result { @@ -545,7 +600,7 @@ impl SockAddrStorage { let err_no = e.as_errno(); match err_no { Some(err_no) => io::Error::from_raw_os_error(err_no as _), - None => io::Error::new(io::ErrorKind::Other, "Unknown error") + None => io::Error::new(io::ErrorKind::Other, "Unknown error"), } }) } @@ -638,7 +693,9 @@ impl<'ring> SQEs<'ring> { /// additional [`SQE`]s will return `None`. pub fn single(&mut self) -> Option> { let mut next = None; - while let Some(sqe) = self.consume() { next = Some(sqe) } + while let Some(sqe) = self.consume() { + next = Some(sqe) + } next } @@ -695,7 +752,9 @@ impl<'ring> Iterator for HardLinked<'ring, '_> { fn next(&mut self) -> Option { let is_final = self.sqes.remaining() == 1; - self.sqes.consume().map(|sqe| HardLinkedSQE { sqe, is_final }) + self.sqes + .consume() + .map(|sqe| HardLinkedSQE { sqe, is_final }) } } @@ -742,7 +801,9 @@ impl<'ring> Iterator for SoftLinked<'ring, '_> { fn next(&mut self) -> Option { let is_final = self.sqes.remaining() == 1; - self.sqes.consume().map(|sqe| SoftLinkedSQE { sqe, is_final }) + self.sqes + .consume() + .map(|sqe| SoftLinkedSQE { sqe, is_final }) } } diff --git a/src/submission_queue.rs b/src/submission_queue.rs index 7a72a72..702f0e6 100644 --- a/src/submission_queue.rs +++ b/src/submission_queue.rs @@ -1,12 +1,12 @@ use std::fmt; use std::io; -use std::ptr::NonNull; use std::marker::PhantomData; +use std::ptr::NonNull; use std::slice; -use std::time::Duration; use std::sync::atomic::{self, Ordering}; +use std::time::Duration; -use super::{IoUring, SQE, SQEs, resultify}; +use super::{resultify, IoUring, SQEs, SQE}; /// The queue of pending IO events. /// @@ -77,9 +77,7 @@ impl<'ring> SubmissionQueue<'ring> { /// # } /// pub fn prepare_sqe<'a>(&'a mut self) -> Option> { - unsafe { - prepare_sqe(self.ring.as_mut()) - } + unsafe { prepare_sqe(self.ring.as_mut()) } } pub fn prepare_sqes<'a>(&'a mut self, count: u32) -> Option> { @@ -100,12 +98,14 @@ impl<'ring> SubmissionQueue<'ring> { resultify(unsafe { uring_sys::io_uring_submit_and_wait(self.ring.as_ptr(), wait_for as _) }) } - pub fn submit_and_wait_with_timeout(&mut self, wait_for: u32, duration: Duration) - -> io::Result - { + pub fn submit_and_wait_with_timeout( + &mut self, + wait_for: u32, + duration: Duration, + ) -> io::Result { let ts = uring_sys::__kernel_timespec { tv_sec: duration.as_secs() as _, - tv_nsec: duration.subsec_nanos() as _ + tv_nsec: duration.subsec_nanos() as _, }; loop { @@ -114,7 +114,10 @@ impl<'ring> SubmissionQueue<'ring> { unsafe { sqe.prep_timeout(&ts, 0, crate::sqe::TimeoutFlags::empty()); sqe.set_user_data(uring_sys::LIBURING_UDATA_TIMEOUT); - return resultify(uring_sys::io_uring_submit_and_wait(self.ring.as_ptr(), wait_for as _)) + return resultify(uring_sys::io_uring_submit_and_wait( + self.ring.as_ptr(), + wait_for as _, + )); } } @@ -134,12 +137,14 @@ impl<'ring> SubmissionQueue<'ring> { impl fmt::Debug for SubmissionQueue<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let fd = unsafe { self.ring.as_ref().ring_fd }; - f.debug_struct(std::any::type_name::()).field("fd", &fd).finish() + f.debug_struct(std::any::type_name::()) + .field("fd", &fd) + .finish() } } -unsafe impl<'ring> Send for SubmissionQueue<'ring> { } -unsafe impl<'ring> Sync for SubmissionQueue<'ring> { } +unsafe impl<'ring> Send for SubmissionQueue<'ring> {} +unsafe impl<'ring> Sync for SubmissionQueue<'ring> {} pub(crate) unsafe fn prepare_sqe<'a>(ring: &mut uring_sys::io_uring) -> Option> { let sqe = uring_sys::io_uring_get_sqe(ring); @@ -152,9 +157,10 @@ pub(crate) unsafe fn prepare_sqe<'a>(ring: &mut uring_sys::io_uring) -> Option(sq: &mut uring_sys::io_uring_sq, count: u32) - -> Option> -{ +pub(crate) unsafe fn prepare_sqes<'a>( + sq: &mut uring_sys::io_uring_sq, + count: u32, +) -> Option> { atomic::fence(Ordering::Acquire); let head: u32 = *sq.khead; diff --git a/tests/accept.rs b/tests/accept.rs index 34e0ee5..17e4f44 100644 --- a/tests/accept.rs +++ b/tests/accept.rs @@ -1,10 +1,10 @@ +use iou::sqe::SockAddr; use nix::sys::socket::InetAddr; use std::{ io::{self, Read, Write}, net::{TcpListener, TcpStream}, os::unix::io::{AsRawFd, FromRawFd}, }; -use iou::sqe::SockAddr; const MESSAGE: &'static [u8] = b"Hello World"; diff --git a/tests/connect.rs b/tests/connect.rs index 3a1a41f..23876c5 100644 --- a/tests/connect.rs +++ b/tests/connect.rs @@ -1,4 +1,4 @@ -use nix::sys::socket::{AddressFamily, SockProtocol, SockType, InetAddr, SockFlag}; +use nix::sys::socket::{AddressFamily, InetAddr, SockFlag, SockProtocol, SockType}; use std::{io, net::TcpListener}; #[test] @@ -14,7 +14,7 @@ fn connect() -> io::Result<()> { SockFlag::SOCK_NONBLOCK, SockProtocol::Tcp, ) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to create socket"))?; + .map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to create socket"))?; let mut ring = iou::IoUring::new(1)?; let mut sqe = ring.prepare_sqe().expect("failed to get sqe"); diff --git a/tests/exhaust-queue.rs b/tests/exhaust-queue.rs index d9a7137..646e875 100644 --- a/tests/exhaust-queue.rs +++ b/tests/exhaust-queue.rs @@ -35,7 +35,6 @@ fn exhaust_queue_with_prepare_sqes() { for counter in base..counter { let cqe = io_uring.peek_for_cqe().unwrap(); assert_eq!(cqe.user_data(), counter); - } } } diff --git a/tests/fileset-placeholder.rs b/tests/fileset-placeholder.rs index c5eb641..7b96fc5 100644 --- a/tests/fileset-placeholder.rs +++ b/tests/fileset-placeholder.rs @@ -1,4 +1,4 @@ -use iou::{IoUring, registrar::RegisteredFd}; +use iou::{registrar::RegisteredFd, IoUring}; use std::fs::File; use std::io::{IoSlice, Read}; use std::os::unix::io::AsRawFd; @@ -20,7 +20,9 @@ fn main() -> std::io::Result<()> { // update a random fileset entry with a valid file let file = std::fs::File::create(&path)?; - let reg_file = registrar.update_registered_files(713, &[file.as_raw_fd()])?.collect::>()[0]; + let reg_file = registrar + .update_registered_files(713, &[file.as_raw_fd()])? + .collect::>()[0]; assert!(!reg_file.is_placeholder()); let bufs = &[IoSlice::new(&TEXT)]; diff --git a/tests/fixed-file-write.rs b/tests/fixed-file-write.rs index c047813..7d947d2 100644 --- a/tests/fixed-file-write.rs +++ b/tests/fixed-file-write.rs @@ -1,4 +1,4 @@ -use iou::{IoUring, registrar::RegisteredFd, Registrar}; +use iou::{registrar::RegisteredFd, IoUring, Registrar}; use std::fs::{self, File}; use std::io::{IoSlice, Read}; use std::os::unix::io::AsRawFd; diff --git a/tests/read.rs b/tests/read.rs index 9542531..0162b2a 100644 --- a/tests/read.rs +++ b/tests/read.rs @@ -79,7 +79,8 @@ fn read_test() -> io::Result<()> { fn read_registered_buf() -> io::Result<()> { let mut io_uring = iou::IoUring::new(32)?; let bufs = vec![Box::new([0u8; 4096]) as Box<[u8]>]; - let mut buf: iou::registrar::RegisteredBuf = io_uring.registrar().register_buffers(bufs)?.next().unwrap(); + let mut buf: iou::registrar::RegisteredBuf = + io_uring.registrar().register_buffers(bufs)?.next().unwrap(); let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); path.push("props"); @@ -119,7 +120,11 @@ fn read_registered_fd_and_buf() -> io::Result<()> { let file = File::open(&path)?; let mut buf: RegisteredBuf = io_uring.registrar().register_buffers(bufs)?.next().unwrap(); - let fd: RegisteredFd = io_uring.registrar().register_files(&[file.as_raw_fd()])?.next().unwrap(); + let fd: RegisteredFd = io_uring + .registrar() + .register_files(&[file.as_raw_fd()])? + .next() + .unwrap(); unsafe { let mut sq = io_uring.sq(); diff --git a/tests/register-buffers.rs b/tests/register-buffers.rs index f06c806..fea12ed 100644 --- a/tests/register-buffers.rs +++ b/tests/register-buffers.rs @@ -3,9 +3,11 @@ fn register_buffers_by_val() { let buf1 = vec![0; 1024].into_boxed_slice(); let buf2 = vec![0; 1024].into_boxed_slice(); let ring = iou::IoUring::new(8).unwrap(); - let bufs: Vec<_> = ring.registrar() - .register_buffers(vec![buf1, buf2]) - .unwrap().collect(); + let bufs: Vec<_> = ring + .registrar() + .register_buffers(vec![buf1, buf2]) + .unwrap() + .collect(); assert_eq!(bufs.len(), 2); assert_eq!(bufs[0].index(), 0); assert_eq!(bufs[1].index(), 1); @@ -17,9 +19,11 @@ fn register_buffers_by_ref() { let buf2 = vec![0; 1024]; let ring = iou::IoUring::new(8).unwrap(); let bufs = &[&buf1[..], &buf2[..]]; - let bufs: Vec<_> = ring.registrar() - .register_buffers_by_ref(bufs) - .unwrap().collect(); + let bufs: Vec<_> = ring + .registrar() + .register_buffers_by_ref(bufs) + .unwrap() + .collect(); assert_eq!(bufs.len(), 2); assert_eq!(bufs[0].index(), 0); assert_eq!(bufs[1].index(), 1); @@ -31,9 +35,11 @@ fn register_buffers_by_mut() { let mut buf2 = vec![0; 1024]; let ring = iou::IoUring::new(8).unwrap(); let bufs = &mut [&mut buf1[..], &mut buf2[..]]; - let bufs: Vec<_> = ring.registrar() - .register_buffers_by_mut(bufs) - .unwrap().collect(); + let bufs: Vec<_> = ring + .registrar() + .register_buffers_by_mut(bufs) + .unwrap() + .collect(); assert_eq!(bufs.len(), 2); assert_eq!(bufs[0].index(), 0); assert_eq!(bufs[1].index(), 1); diff --git a/tests/write.rs b/tests/write.rs index 8e093a0..f3f4609 100644 --- a/tests/write.rs +++ b/tests/write.rs @@ -1,7 +1,7 @@ use std::fs::{self, File}; use std::io::{self, Read}; -use std::path::PathBuf; use std::os::unix::io::AsRawFd; +use std::path::PathBuf; const TEXT: &[u8] = b"I really wanna stop But I just gotta taste for it @@ -23,12 +23,11 @@ fn vectored_write_test() -> io::Result<()> { path.push("vectored.tmp"); let _ = fs::remove_file(&path); - + let n = { let mut io_uring = iou::IoUring::new(32)?; let bufs = [io::IoSlice::new(TEXT)]; - let file = File::create(&path)?; unsafe { let mut sq = io_uring.sq(); @@ -61,7 +60,7 @@ fn write_test() -> io::Result<()> { path.push("text.tmp"); let _ = fs::remove_file(&path); - + let n = { let mut io_uring = iou::IoUring::new(32)?; @@ -89,7 +88,6 @@ fn write_test() -> io::Result<()> { Ok(()) } - #[test] fn write_registered_buf() -> io::Result<()> { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -100,7 +98,8 @@ fn write_registered_buf() -> io::Result<()> { let mut io_uring = iou::IoUring::new(32)?; let bufs = vec![Box::new([0u8; 4096]) as Box<[u8]>]; - let mut buf: iou::registrar::RegisteredBuf = io_uring.registrar().register_buffers(bufs)?.next().unwrap(); + let mut buf: iou::registrar::RegisteredBuf = + io_uring.registrar().register_buffers(bufs)?.next().unwrap(); buf.as_mut().slice_to_mut(TEXT.len()).copy_from_slice(TEXT); From 0a1ec22f43d05afeafd518545c6be3305014a1e2 Mon Sep 17 00:00:00 2001 From: Liu Jiang Date: Sat, 2 Jan 2021 13:58:23 +0800 Subject: [PATCH 2/9] use io_uring_wait_cqe_nr() instead of io_uring_wait_cqes() Replace io_uring_wait_cqes() with the simpler io_uring_wait_cqe_nr(). Signed-off-by: Liu Jiang --- src/completion_queue.rs | 6 ++---- src/cqe.rs | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/completion_queue.rs b/src/completion_queue.rs index ebf70d9..b7e670e 100644 --- a/src/completion_queue.rs +++ b/src/completion_queue.rs @@ -2,7 +2,7 @@ use std::fmt; use std::io; use std::marker::PhantomData; use std::mem::MaybeUninit; -use std::ptr::{self, NonNull}; +use std::ptr::NonNull; use super::{resultify, CQEs, CQEsBlocking, IoUring, CQE}; @@ -61,12 +61,10 @@ impl<'ring> CompletionQueue<'ring> { unsafe { let mut cqe = MaybeUninit::uninit(); - resultify(uring_sys::io_uring_wait_cqes( + resultify(uring_sys::io_uring_wait_cqe_nr( self.ring.as_ptr(), cqe.as_mut_ptr(), count as _, - ptr::null(), - ptr::null(), ))?; Ok(&mut *cqe.assume_init()) diff --git a/src/cqe.rs b/src/cqe.rs index 39a4515..25c9a38 100644 --- a/src/cqe.rs +++ b/src/cqe.rs @@ -1,7 +1,7 @@ use std::io; use std::marker::PhantomData; use std::mem::MaybeUninit; -use std::ptr::{self, NonNull}; +use std::ptr::NonNull; use super::{resultify, IoUring}; @@ -168,12 +168,10 @@ impl<'a> CQEsBlocking<'a> { unsafe { let mut cqe = MaybeUninit::uninit(); - resultify(uring_sys::io_uring_wait_cqes( + resultify(uring_sys::io_uring_wait_cqe_nr( self.ring.as_ptr(), cqe.as_mut_ptr(), self.wait_for as _, - ptr::null(), - ptr::null(), ))?; Ok(&mut *cqe.assume_init()) From a70ba43cbf5cd639ddfa1877f2f076bb1bc03855 Mon Sep 17 00:00:00 2001 From: Liu Jiang Date: Sat, 2 Jan 2021 14:04:31 +0800 Subject: [PATCH 3/9] check for unsupported flags in IoUring::new_with_flags() When creating a new IoUring instance, some feature flags needs additional configuraiton data, such as SQ_AFF needing a cpu id and ATTACH_WQ needing a reference io_uring fd. An advanced Builder is needed to support these flags. Signed-off-by: Liu Jiang --- src/lib.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index d298f04..c4b09ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -185,6 +185,13 @@ impl IoUring { flags: SetupFlags, features: SetupFeatures, ) -> io::Result { + // TODO: add Builder to support SQ_AFF and ATTACH_WQ, which needs set more fields in the + // uring_sys::io_uring_params struct. + + if flags & (SetupFlags::SQ_AFF | SetupFlags::ATTACH_WQ) != SetupFlags::empty() { + return Err(io::Error::from_raw_os_error(libc::EINVAL)); + } + unsafe { let mut params: uring_sys::io_uring_params = mem::zeroed(); params.flags = flags.bits(); From 52da72b1ae68b1d10c06bbe11ecf75e8854f43b3 Mon Sep 17 00:00:00 2001 From: Liu Jiang Date: Sat, 2 Jan 2021 14:09:40 +0800 Subject: [PATCH 4/9] fix possible overfloating in prepare_cqes() Safe Wrapping should be used instead of u32. Also validate the input parameter `count` for safety. Signed-off-by: Liu Jiang --- src/submission_queue.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/submission_queue.rs b/src/submission_queue.rs index 702f0e6..73460ad 100644 --- a/src/submission_queue.rs +++ b/src/submission_queue.rs @@ -7,6 +7,7 @@ use std::sync::atomic::{self, Ordering}; use std::time::Duration; use super::{resultify, IoUring, SQEs, SQE}; +use bitflags::_core::num::Wrapping; /// The queue of pending IO events. /// @@ -163,13 +164,20 @@ pub(crate) unsafe fn prepare_sqes<'a>( ) -> Option> { atomic::fence(Ordering::Acquire); - let head: u32 = *sq.khead; - let next: u32 = sq.sqe_tail + count; + let cap = Wrapping(*sq.kring_entries as u32); + let count = Wrapping(count); + // Protect "next - head <= cap" from over-floating caused by `count` + if count > cap { + return None; + } + + let head = Wrapping(*sq.khead as u32); + let next = Wrapping(sq.sqe_tail as u32) + count; - if next - head <= *sq.kring_entries { + if next - head <= cap { let sqe = sq.sqes.offset((sq.sqe_tail & *sq.kring_mask) as isize); - sq.sqe_tail = next; - Some(SQEs::new(slice::from_raw_parts_mut(sqe, count as usize))) + sq.sqe_tail = next.0; + Some(SQEs::new(slice::from_raw_parts_mut(sqe, count.0 as usize))) } else { None } From 99c4d97cd65576d5c7c3e337926fed62a5c669f8 Mon Sep 17 00:00:00 2001 From: Liu Jiang Date: Sat, 2 Jan 2021 14:13:28 +0800 Subject: [PATCH 5/9] ensure all descriptor are consumed Both HardLinked and SoftLinked provides terminate() method to mark the end of IO chain, so there may be left over descriptors on the CQEs. Ensure all possible left over descriptors are consumed when dropping. Signed-off-by: Liu Jiang --- src/sqe.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/sqe.rs b/src/sqe.rs index 9a54271..c3f0056 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -758,6 +758,17 @@ impl<'ring> Iterator for HardLinked<'ring, '_> { } } +impl<'ring> Drop for HardLinked<'ring, '_> { + fn drop(&mut self) { + // Ensure that all left descriptors are properly consumed. + for mut sqe in &mut self.sqes { + unsafe { + sqe.prep_nop(); + } + } + } +} + pub struct HardLinkedSQE<'ring> { sqe: SQE<'ring>, is_final: bool, @@ -807,6 +818,17 @@ impl<'ring> Iterator for SoftLinked<'ring, '_> { } } +impl<'ring> Drop for SoftLinked<'ring, '_> { + fn drop(&mut self) { + // Ensure that all left descriptors are properly consumed. + for mut sqe in &mut self.sqes { + unsafe { + sqe.prep_nop(); + } + } + } +} + pub struct SoftLinkedSQE<'ring> { sqe: SQE<'ring>, is_final: bool, From 2de63c3cf7a636549ffc93ddcbde3b441a62dc52 Mon Sep 17 00:00:00 2001 From: Liu Jiang Date: Sat, 2 Jan 2021 13:53:13 +0800 Subject: [PATCH 6/9] zero all descriptor returned by SQEs Ensure all descriptors returned by SQEs are zeroed for safety. Signed-off-by: Liu Jiang --- src/sqe.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/sqe.rs b/src/sqe.rs index c3f0056..42f57d0 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -722,7 +722,10 @@ impl<'ring> SQEs<'ring> { fn consume(&mut self) -> Option> { self.sqes.next().map(|sqe| { - unsafe { uring_sys::io_uring_prep_nop(sqe) } + unsafe { + *sqe = mem::zeroed(); + uring_sys::io_uring_prep_nop(sqe); + } SQE { sqe } }) } From 04a82300d25e8ef9db218a9700a15d18497097b4 Mon Sep 17 00:00:00 2001 From: Liu Jiang Date: Sat, 2 Jan 2021 14:20:15 +0800 Subject: [PATCH 7/9] avoid unnecessary pre_nop() operations Now all allocated event descriptor are zeroed, there's no need to call pre_nop() for every descriptor any more. Signed-off-by: Liu Jiang --- src/sqe.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/sqe.rs b/src/sqe.rs index 42f57d0..d79cdc1 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -693,7 +693,8 @@ impl<'ring> SQEs<'ring> { /// additional [`SQE`]s will return `None`. pub fn single(&mut self) -> Option> { let mut next = None; - while let Some(sqe) = self.consume() { + while let Some(mut sqe) = self.consume() { + unsafe { sqe.prep_nop() }; next = Some(sqe) } next @@ -722,10 +723,7 @@ impl<'ring> SQEs<'ring> { fn consume(&mut self) -> Option> { self.sqes.next().map(|sqe| { - unsafe { - *sqe = mem::zeroed(); - uring_sys::io_uring_prep_nop(sqe); - } + unsafe { *sqe = mem::zeroed() }; SQE { sqe } }) } From 7ec7cd66d00d616e287fa16de876b9de7d0c5d02 Mon Sep 17 00:00:00 2001 From: Liu Jiang Date: Sat, 2 Jan 2021 14:06:36 +0800 Subject: [PATCH 8/9] improve documentation Signed-off-by: Liu Jiang --- src/completion_queue.rs | 3 +++ src/lib.rs | 41 ++++++++++++++++++++++++++++++++++++++++- src/probe.rs | 1 + src/registrar/mod.rs | 1 + src/sqe.rs | 31 +++++++++++++++++++++++++++++-- src/submission_queue.rs | 33 +++++++++++++++++++++++++++++++-- 6 files changed, 105 insertions(+), 5 deletions(-) diff --git a/src/completion_queue.rs b/src/completion_queue.rs index b7e670e..aacf7ed 100644 --- a/src/completion_queue.rs +++ b/src/completion_queue.rs @@ -87,14 +87,17 @@ impl<'ring> CompletionQueue<'ring> { CQEsBlocking::new(self.ring, wait_for) } + /// Returns how many descriptors are ready for processing on the completion queue. pub fn ready(&self) -> u32 { unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) } } + /// Returns true if the eventfd notification is currently enabled. pub fn eventfd_enabled(&self) -> bool { unsafe { uring_sys::io_uring_cq_eventfd_enabled(self.ring.as_ptr()) } } + /// Toggle eventfd notification on or off, if an eventfd is registered with the ring. pub fn eventfd_toggle(&mut self, enabled: bool) -> io::Result<()> { resultify(unsafe { uring_sys::io_uring_cq_eventfd_toggle(self.ring.as_ptr(), enabled) })?; Ok(()) diff --git a/src/lib.rs b/src/lib.rs index c4b09ee..77ddc34 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,9 +103,16 @@ bitflags::bitflags! { /// Force the kernel thread created with `SQPOLL` to be bound to the CPU used by the /// `SubmissionQueue`. Requires `SQPOLL` set. const SQ_AFF = 1 << 2; /* sq_thread_cpu is valid */ - + /// Create the completion queue with struct io_uring_params.cq_entries entries. + /// The value must be greater than entries, and may be rounded up to the next power-of-two. const CQSIZE = 1 << 3; + /// Clamp the values for SQ or CQ ring size to the max values instead of returning -EINVAL. const CLAMP = 1 << 4; + /// Share the asynchronous backend (kernel work thread) with an existing io_uring instance. + /// + /// If ATTACH_WQ is set, io_uring_params::wq_fd should be a valid io_uring fd, io-wq of + /// which will be shared with the newly created io_uring instance. If the flag is set + /// but it can't share io-wq, it fails. const ATTACH_WQ = 1 << 5; } } @@ -232,6 +239,7 @@ impl IoUring { ) } + /// Returns a probe structure to detect supported IO operations. pub fn probe(&mut self) -> io::Result { Probe::for_ring(&mut self.ring) } @@ -255,12 +263,33 @@ impl IoUring { /// Submit all prepared [`SQE`]s to the kernel and wait until at least `wait_for` events have /// completed. + /// + /// # Return value + /// - the number of submitted events, it's safe to reuse SQE entries in the ring. This is true + /// even if the actual IO submission had to be punted to async context, which means that the + /// SQE may in fact not have been submitted yet. + /// - an [`io::Error`](std::io::Result) variant if this function encounters any IO errors. pub fn submit_sqes_and_wait(&mut self, wait_for: u32) -> io::Result { self.sq().submit_and_wait(wait_for) } /// Submit all prepared [`SQE`]s to the kernel and wait until at least `wait_for` events have /// completed or `duration` has passed. + /// + /// # Return value + /// - the number of submitted events, it's safe to reuse SQE entries in the ring. This is true + /// even if the actual IO submission had to be punted to async context, which means that the + /// SQE may in fact not have been submitted yet. + /// - an [`io::Error`](std::io::Result) variant if this function encounters any IO errors. + /// + /// # Note + /// Due to the way timeout is implemented, there are two possible flaws: + /// - the timeout is unreliable. When all submission queue is full, it fallbacks to submit() + /// silently. + /// - the returned value may be bigger than expectation. There may be one extra descriptor + /// consumed by the timeout mechanism. The user data of descriptor consumed by timeout is + /// set to [`LIBURING_UDATA_TIMEOUT`](uring_sys::LIBURING_UDATA_TIMEOUT)(u64::MAX), so this + /// special value is reserved. pub fn submit_sqes_and_wait_with_timeout( &mut self, wait_for: u32, @@ -292,6 +321,10 @@ impl IoUring { } /// Block until a [`CQE`] is ready or timeout. + /// + /// # Safety + /// The timeout is implemented by adding an IORING_OP_TIMEOUT event to the submission queue, + /// so it touches both the submission and completion queue and not multi-thread safe. pub fn wait_for_cqe_with_timeout(&mut self, duration: Duration) -> io::Result { let ts = uring_sys::__kernel_timespec { tv_sec: duration.as_secs() as _, @@ -351,26 +384,32 @@ impl IoUring { &mut self.ring } + /// Returns how many descriptors are ready for processing on the completion queue. pub fn cq_ready(&mut self) -> u32 { self.cq().ready() } + /// Returns the numbers of ready event descriptors on the submission queue. pub fn sq_ready(&mut self) -> u32 { self.sq().ready() } + /// Returns the numbers of available event descriptors on the submission queue. pub fn sq_space_left(&mut self) -> u32 { self.sq().space_left() } + /// Returns true if the eventfd notification is currently enabled. pub fn cq_eventfd_enabled(&mut self) -> bool { self.cq().eventfd_enabled() } + /// Toggle eventfd notification on or off, if an eventfd is registered with the ring. pub fn cq_eventfd_toggle(&mut self, enabled: bool) -> io::Result<()> { self.cq().eventfd_toggle(enabled) } + /// Returns the RawFd for the io_uring handle. pub fn raw_fd(&self) -> RawFd { self.ring.ring_fd } diff --git a/src/probe.rs b/src/probe.rs index 5735dec..1931434 100644 --- a/src/probe.rs +++ b/src/probe.rs @@ -26,6 +26,7 @@ impl Probe { } } + /// Check whether an operation is supported by this kernel version's io-uring interface. pub fn supports(&self, op: uring_sys::IoRingOp) -> bool { unsafe { uring_sys::io_uring_opcode_supported(self.probe.as_ptr(), op as _) != 0 } } diff --git a/src/registrar/mod.rs b/src/registrar/mod.rs index 3eab6a4..266980f 100644 --- a/src/registrar/mod.rs +++ b/src/registrar/mod.rs @@ -262,6 +262,7 @@ unsafe impl<'ring> Send for Registrar<'ring> {} unsafe impl<'ring> Sync for Registrar<'ring> {} #[derive(Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Clone, Copy)] +/// An identity for a registered credential. pub struct Personality { pub(crate) id: u16, } diff --git a/src/sqe.rs b/src/sqe.rs index d79cdc1..e2f044a 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -618,14 +618,19 @@ bitflags::bitflags! { const FIXED_FILE = 1 << 0; /* use fixed fileset */ /// Submit this event only after completing all ongoing submission events. const IO_DRAIN = 1 << 1; /* issue after inflight IO */ - /// Force the next submission event to wait until this event has completed sucessfully. + /// Force the next submission event to wait until this event has completed successfully. /// /// An event's link only applies to the next event, but link chains can be /// arbitrarily long. const IO_LINK = 1 << 2; /* next IO depends on this one */ - + /// Force the next submission event to wait until this event has completed. + /// + /// An event's link only applies to the next event, but link chains can be arbitrarily long. + /// The next submission event will be executed no matter current event succeeds or fails. const IO_HARDLINK = 1 << 3; + /// Execute the event in asynchronous mode without trying non-blocking mode first. const ASYNC = 1 << 4; + const BUFFER_SELECT = 1 << 5; } } @@ -738,6 +743,9 @@ impl<'ring> Iterator for SQEs<'ring> { } /// An Iterator of [`SQE`]s which will be hard linked together. +/// +/// All HardLinked objects must be dropped before submitting the submission queue to ensure +/// correctly handling of the IO chain. pub struct HardLinked<'ring, 'a> { sqes: &'a mut SQEs<'ring>, } @@ -770,6 +778,10 @@ impl<'ring> Drop for HardLinked<'ring, '_> { } } +/// Represent a non-tail event descriptor on an hardly linked IO chain. +/// +/// All HardLinkedSQE objects must be dropped before submitting the submission queue to ensure +/// correctly handling of the IO_HARDLINK flag. pub struct HardLinkedSQE<'ring> { sqe: SQE<'ring>, is_final: bool, @@ -789,6 +801,10 @@ impl<'ring> DerefMut for HardLinkedSQE<'ring> { } } +// TODO: any better way to set the IO_HARDLINK flag? +// If submit() is called before dropping the HardLinkedSQE object, it may caused race windows +// under which the kernel observes malformed IO chains. This type of race window will be very hard +// to root cause. impl<'ring> Drop for HardLinkedSQE<'ring> { fn drop(&mut self) { if !self.is_final { @@ -798,6 +814,9 @@ impl<'ring> Drop for HardLinkedSQE<'ring> { } /// An Iterator of [`SQE`]s which will be soft linked together. +/// +/// All SoftLinked objects must be dropped before submitting the submission queue to ensure +/// correctly handling of the IO chain. pub struct SoftLinked<'ring, 'a> { sqes: &'a mut SQEs<'ring>, } @@ -830,6 +849,10 @@ impl<'ring> Drop for SoftLinked<'ring, '_> { } } +/// Represent a non-tail event descriptor on an softly linked IO chain. +/// +/// All SoftLinkedSQE objects must be dropped before submitting the submission queue to ensure +/// correctly handling of the IO_HARDLINK flag. pub struct SoftLinkedSQE<'ring> { sqe: SQE<'ring>, is_final: bool, @@ -849,6 +872,10 @@ impl<'ring> DerefMut for SoftLinkedSQE<'ring> { } } +// TODO: any better way to set the IO_LINK flag? +// If submit() is called before dropping the SoftLinkedSQE object, it may caused race windows +// under which the kernel observes malformed IO chains. This type of race window will be very hard +// to root cause. impl<'ring> Drop for SoftLinkedSQE<'ring> { fn drop(&mut self) { if !self.is_final { diff --git a/src/submission_queue.rs b/src/submission_queue.rs index 73460ad..e620d24 100644 --- a/src/submission_queue.rs +++ b/src/submission_queue.rs @@ -90,15 +90,43 @@ impl<'ring> SubmissionQueue<'ring> { /// Submit all events in the queue. Returns the number of submitted events. /// - /// If this function encounters any IO errors an [`io::Error`](std::io::Result) variant is returned. + /// # Return value + /// - the number of submitted events, it's safe to reuse SQE entries in the ring. This is true + /// even if the actual IO submission had to be punted to async context, which means that the + /// SQE may in fact not have been submitted yet. + /// - an [`io::Error`](std::io::Result) variant if this function encounters any IO errors. pub fn submit(&mut self) -> io::Result { resultify(unsafe { uring_sys::io_uring_submit(self.ring.as_ptr()) }) } + /// Submit all events in the queue and wait for `wait_for` event completions before returning. + /// + /// # Return value + /// - the number of submitted events, it's safe to reuse SQE entries in the ring. This is true + /// even if the actual IO submission had to be punted to async context, which means that the + /// SQE may in fact not have been submitted yet. + /// - an [`io::Error`](std::io::Result) variant if this function encounters any IO errors. pub fn submit_and_wait(&mut self, wait_for: u32) -> io::Result { resultify(unsafe { uring_sys::io_uring_submit_and_wait(self.ring.as_ptr(), wait_for as _) }) } + /// Submit all events in the queue and wait for `wait_for` event completions before returning, + /// timeout after waiting for `duration`. + /// + /// # Return value + /// - the number of submitted events, it's safe to reuse SQE entries in the ring. This is true + /// even if the actual IO submission had to be punted to async context, which means that the + /// SQE may in fact not have been submitted yet. + /// - an [`io::Error`](std::io::Result) variant if this function encounters any IO errors. + /// + /// # Note + /// Due to the way timeout is implemented, there are two possible flaws: + /// - the timeout is unreliable. When all submission queue is full, it fallbacks to submit() + /// silently. + /// - the returned value may be bigger than expectation. There may be one extra descriptor + /// consumed by the timeout mechanism. The user data of descriptor consumed by timeout is + /// set to [`LIBURING_UDATA_TIMEOUT`](uring_sys::LIBURING_UDATA_TIMEOUT)(u64::MAX), so this + /// special value is reserved. pub fn submit_and_wait_with_timeout( &mut self, wait_for: u32, @@ -111,7 +139,6 @@ impl<'ring> SubmissionQueue<'ring> { loop { if let Some(mut sqe) = self.prepare_sqe() { - sqe.clear(); unsafe { sqe.prep_timeout(&ts, 0, crate::sqe::TimeoutFlags::empty()); sqe.set_user_data(uring_sys::LIBURING_UDATA_TIMEOUT); @@ -126,10 +153,12 @@ impl<'ring> SubmissionQueue<'ring> { } } + /// Returns the numbers of ready event descriptors on the submission queue. pub fn ready(&self) -> u32 { unsafe { uring_sys::io_uring_sq_ready(self.ring.as_ptr()) as u32 } } + /// Returns the numbers of available event descriptors on the submission queue. pub fn space_left(&self) -> u32 { unsafe { uring_sys::io_uring_sq_space_left(self.ring.as_ptr()) as u32 } } From 153a0eb544dbd1b382a2b1254b290744a3afa97f Mon Sep 17 00:00:00 2001 From: Liu Jiang Date: Sat, 2 Jan 2021 16:29:05 +0800 Subject: [PATCH 9/9] fix some clippy warnings Signed-off-by: Liu Jiang --- src/lib.rs | 7 ++++--- src/submission_queue.rs | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 77ddc34..2793ca3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -433,9 +433,10 @@ unsafe impl Send for IoUring {} unsafe impl Sync for IoUring {} fn resultify(x: i32) -> io::Result { - match x >= 0 { - true => Ok(x as u32), - false => Err(io::Error::from_raw_os_error(-x)), + if x >= 0 { + Ok(x as u32) + } else { + Err(io::Error::from_raw_os_error(-x)) } } diff --git a/src/submission_queue.rs b/src/submission_queue.rs index e620d24..3bb4e0b 100644 --- a/src/submission_queue.rs +++ b/src/submission_queue.rs @@ -77,11 +77,11 @@ impl<'ring> SubmissionQueue<'ring> { /// # Ok(()) /// # } /// - pub fn prepare_sqe<'a>(&'a mut self) -> Option> { + pub fn prepare_sqe(&mut self) -> Option { unsafe { prepare_sqe(self.ring.as_mut()) } } - pub fn prepare_sqes<'a>(&'a mut self, count: u32) -> Option> { + pub fn prepare_sqes(&mut self, count: u32) -> Option { unsafe { let sq: &mut uring_sys::io_uring_sq = &mut (*self.ring.as_ptr()).sq; prepare_sqes(sq, count)