From 2894f9aa4b0bf70dd1379a0625b850ee2d3df678 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Fri, 13 Dec 2019 20:56:07 +0300 Subject: [PATCH 01/13] sqe poll methods --- Cargo.toml | 4 ++- src/lib.rs | 2 +- src/sqe.rs | 23 +++++++++++++++ tests/poll.rs | 79 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 106 insertions(+), 2 deletions(-) create mode 100644 tests/poll.rs diff --git a/Cargo.toml b/Cargo.toml index 8524e08..72ec6be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,8 @@ [package] name = "iou" -version = "0.1.0" +version = "0.2.0" description = "io_uring bindings" +documentation = "https://docs.rs/iou" repository = "https://github.com/withoutboats/iou" keywords = ["io_uring", "liburing", "async", "futures"] license = "MIT OR Apache-2.0" @@ -10,4 +11,5 @@ edition = "2018" [dependencies] bitflags = "1.2.0" +libc = "0.2.66" uring-sys = "1.0.0-beta" diff --git a/src/lib.rs b/src/lib.rs index 7098410..9a93fd2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,7 @@ use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; use std::time::Duration; -pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags}; +pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags, PollMask}; pub use cqe::{CompletionQueue, CompletionQueueEvent}; pub use registrar::Registrar; diff --git a/src/sqe.rs b/src/sqe.rs index dc7c627..8dc95d9 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -272,6 +272,16 @@ impl<'a> SubmissionQueueEvent<'a> { flags.bits() as _); } + #[inline] + pub unsafe fn prep_poll_add(&mut self, fd: RawFd, poll_mask: PollMask) { + uring_sys::io_uring_prep_poll_add(self.sqe, fd, poll_mask.bits()) + } + + #[inline] + pub unsafe fn prep_poll_remove(&mut self, user_data: u64) { + uring_sys::io_uring_prep_poll_remove(self.sqe, user_data as _) + } + /// Prepare a no-op event. /// ``` /// # use iou::{IoUring, SubmissionFlags}; @@ -376,3 +386,16 @@ bitflags::bitflags! { const TIMEOUT_ABS = 1 << 0; } } + +bitflags::bitflags! { + pub struct PollMask: libc::c_short { + const POLLIN = libc::POLLIN; + const POLLPRI = libc::POLLPRI; + const POLLOUT = libc::POLLOUT; + const POLLERR = libc::POLLERR; + const POLLHUP = libc::POLLHUP; + const POLLNVAL = libc::POLLNVAL; + const POLLRDNORM = libc::POLLRDNORM; + const POLLRDBAND = libc::POLLRDBAND; + } +} diff --git a/tests/poll.rs b/tests/poll.rs new file mode 100644 index 0000000..9df5303 --- /dev/null +++ b/tests/poll.rs @@ -0,0 +1,79 @@ +#![feature(test)] +extern crate libc; +extern crate test; + +use std::{io, os::unix::io::RawFd}; + +pub fn pipe() -> io::Result<(RawFd, RawFd)> { + unsafe { + let mut fds = core::mem::MaybeUninit::<[libc::c_int; 2]>::uninit(); + + let res = libc::pipe(fds.as_mut_ptr() as *mut libc::c_int); + + if res < 0 { + Err(io::Error::from_raw_os_error(-res)) + } else { + Ok((fds.assume_init()[0], fds.assume_init()[1])) + } + } +} + +#[test] +fn test_poll_add() -> io::Result<()> { + let mut ring = iou::IoUring::new(2)?; + let (read, write) = pipe()?; + + unsafe { + let mut sqe = ring.next_sqe().expect("no sqe"); + sqe.prep_poll_add(read, iou::PollMask::POLLIN); + sqe.set_user_data(0xDEADBEEF); + ring.submit_sqes()?; + } + + let res = unsafe { + let buf = b"hello"; + libc::write( + write, + buf.as_ptr() as *const libc::c_void, + buf.len() as libc::size_t, + ) + }; + + if res < 0 { + return Err(io::Error::from_raw_os_error(-res as _)); + } + + let cqe = ring.wait_for_cqe()?; + assert_eq!(cqe.user_data(), 0xDEADBEEF); + let mask = unsafe { iou::PollMask::from_bits_unchecked(cqe.result()? as _) }; + assert!(mask.contains(iou::PollMask::POLLIN)); + unsafe { + libc::close(write); + libc::close(read); + } + Ok(()) +} + +#[test] +fn test_poll_remove() -> io::Result<()> { + let mut ring = iou::IoUring::new(2)?; + let (read, write) = pipe()?; + + unsafe { + let mut sqe = ring.next_sqe().expect("no sqe"); + sqe.prep_poll_add(read, iou::PollMask::POLLIN); + sqe.set_user_data(0xDEADBEEF); + ring.submit_sqes()?; + + let mut sqe = ring.next_sqe().expect("no sqe"); + sqe.prep_poll_remove(0xDEADBEEF); + ring.submit_sqes()?; + for _ in 0..2 { + let cqe = ring.wait_for_cqe()?; + let _ = cqe.result()?; + } + libc::close(write); + libc::close(read); + Ok(()) + } +} From d11ce3a6646b4357ff1231fd06d794fa26641ba4 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Mon, 23 Dec 2019 23:15:02 +0300 Subject: [PATCH 02/13] sqe "poll" methods --- Cargo.toml | 5 ++++- src/lib.rs | 4 +++- src/sqe.rs | 18 +++--------------- tests/poll.rs | 8 ++++---- 4 files changed, 14 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 72ec6be..3e62cc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,5 +11,8 @@ edition = "2018" [dependencies] bitflags = "1.2.0" -libc = "0.2.66" +nix = "0.16.0" uring-sys = "1.0.0-beta" + +[dev-dependencies] +libc = "0.2.66" diff --git a/src/lib.rs b/src/lib.rs index 9a93fd2..8b7a386 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,10 +55,12 @@ use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; use std::time::Duration; -pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags, PollMask}; +pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags}; pub use cqe::{CompletionQueue, CompletionQueueEvent}; pub use registrar::Registrar; +pub use nix::poll::PollFlags; + bitflags::bitflags! { /// `IoUring` initialization flags for advanced use cases. /// diff --git a/src/sqe.rs b/src/sqe.rs index 8dc95d9..18152e2 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -6,6 +6,7 @@ use std::marker::PhantomData; use std::time::Duration; use super::IoUring; +use crate::{PollFlags}; /// The queue of pending IO events. /// @@ -273,8 +274,8 @@ impl<'a> SubmissionQueueEvent<'a> { } #[inline] - pub unsafe fn prep_poll_add(&mut self, fd: RawFd, poll_mask: PollMask) { - uring_sys::io_uring_prep_poll_add(self.sqe, fd, poll_mask.bits()) + pub unsafe fn prep_poll_add(&mut self, fd: RawFd, poll_flags: PollFlags) { + uring_sys::io_uring_prep_poll_add(self.sqe, fd, poll_flags.bits()) } #[inline] @@ -386,16 +387,3 @@ bitflags::bitflags! { const TIMEOUT_ABS = 1 << 0; } } - -bitflags::bitflags! { - pub struct PollMask: libc::c_short { - const POLLIN = libc::POLLIN; - const POLLPRI = libc::POLLPRI; - const POLLOUT = libc::POLLOUT; - const POLLERR = libc::POLLERR; - const POLLHUP = libc::POLLHUP; - const POLLNVAL = libc::POLLNVAL; - const POLLRDNORM = libc::POLLRDNORM; - const POLLRDBAND = libc::POLLRDBAND; - } -} diff --git a/tests/poll.rs b/tests/poll.rs index 9df5303..171ebeb 100644 --- a/tests/poll.rs +++ b/tests/poll.rs @@ -25,7 +25,7 @@ fn test_poll_add() -> io::Result<()> { unsafe { let mut sqe = ring.next_sqe().expect("no sqe"); - sqe.prep_poll_add(read, iou::PollMask::POLLIN); + sqe.prep_poll_add(read, iou::PollFlags::POLLIN); sqe.set_user_data(0xDEADBEEF); ring.submit_sqes()?; } @@ -45,8 +45,8 @@ fn test_poll_add() -> io::Result<()> { let cqe = ring.wait_for_cqe()?; assert_eq!(cqe.user_data(), 0xDEADBEEF); - let mask = unsafe { iou::PollMask::from_bits_unchecked(cqe.result()? as _) }; - assert!(mask.contains(iou::PollMask::POLLIN)); + let mask = unsafe { iou::PollFlags::from_bits_unchecked(cqe.result()? as _) }; + assert!(mask.contains(iou::PollFlags::POLLIN)); unsafe { libc::close(write); libc::close(read); @@ -61,7 +61,7 @@ fn test_poll_remove() -> io::Result<()> { unsafe { let mut sqe = ring.next_sqe().expect("no sqe"); - sqe.prep_poll_add(read, iou::PollMask::POLLIN); + sqe.prep_poll_add(read, iou::PollFlags::POLLIN); sqe.set_user_data(0xDEADBEEF); ring.submit_sqes()?; From 29c56e47436f3a7e71c407a73931bfa03bd8f70b Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 24 Dec 2019 13:58:01 +0300 Subject: [PATCH 03/13] work --- Cargo.toml | 3 +- src/lib.rs | 2 +- src/sqe.rs | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 99 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3e62cc0..693547e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,8 @@ edition = "2018" [dependencies] bitflags = "1.2.0" nix = "0.16.0" +libc = "0.2.66" uring-sys = "1.0.0-beta" [dev-dependencies] -libc = "0.2.66" + diff --git a/src/lib.rs b/src/lib.rs index 8b7a386..e8cfb85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,7 @@ pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags pub use cqe::{CompletionQueue, CompletionQueueEvent}; pub use registrar::Registrar; -pub use nix::poll::PollFlags; +pub use nix::{poll::PollFlags, sys::socket::{SockAddr, SockFlag, MsgFlags}}; bitflags::bitflags! { /// `IoUring` initialization flags for advanced use cases. diff --git a/src/sqe.rs b/src/sqe.rs index 18152e2..df4ab8d 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -4,9 +4,11 @@ use std::os::unix::io::RawFd; use std::ptr::{self, NonNull}; use std::marker::PhantomData; use std::time::Duration; +use std::io::{IoSlice, IoSliceMut}; use super::IoUring; -use crate::{PollFlags}; +use super::{PollFlags, SockAddr, SockFlag, MsgFlags}; +use nix::sys::socket; /// The queue of pending IO events. /// @@ -283,6 +285,27 @@ impl<'a> SubmissionQueueEvent<'a> { uring_sys::io_uring_prep_poll_remove(self.sqe, user_data as _) } + #[inline] + pub unsafe fn prep_connect(&mut self, fd: RawFd, socket_addr: &SockAddr) { + let (addr, len) = socket_addr.as_ffi_pair(); + uring_sys::io_uring_prep_connect(self.sqe, fd, addr as *const _ as *mut _, len); + } + + #[inline] + pub unsafe fn prep_accept(&mut self, fd: RawFd, flags: SockFlag) { + uring_sys::io_uring_prep_accept(self.sqe, fd, std::ptr::null_mut(), std::ptr::null_mut(), flags.bits()) + } + + #[inline] + pub unsafe fn prep_sendmsg<'b>(&mut self, fd: RawFd, msg: &'b SendMsg<'b>, flags: MsgFlags) { + uring_sys::io_uring_prep_sendmsg(self.sqe, fd, &msg.msghdr, flags.bits() as _) + } + + #[inline] + pub unsafe fn prep_recvmsg<'b>(&mut self, fd: RawFd, msg: &'b mut RecvMsg<'b>, flags: MsgFlags) { + uring_sys::io_uring_prep_recvmsg(self.sqe, fd, &mut msg.msghdr, flags.bits() as _) + } + /// Prepare a no-op event. /// ``` /// # use iou::{IoUring, SubmissionFlags}; @@ -358,6 +381,78 @@ impl<'a> SubmissionQueueEvent<'a> { unsafe impl<'a> Send for SubmissionQueueEvent<'a> { } unsafe impl<'a> Sync for SubmissionQueueEvent<'a> { } +pub struct SendMsg<'a> { + msghdr: libc::msghdr, + _m: PhantomData<&'a libc::msghdr>, +} + +impl<'a> SendMsg<'a> { + pub fn new(buffers: &'a [IoSlice<'a>], addr: Option<&'a SockAddr>) -> Self { + let (name, namelen) = match addr { + Some(addr) => { + let (x, y) = unsafe { addr.as_ffi_pair() }; + (x as *const _, y) + }, + None => (ptr::null(), 0), + }; + + let msghdr = unsafe { + let mut msghdr = mem::MaybeUninit::::zeroed(); + let p = msghdr.as_mut_ptr(); + (*p).msg_name = name as *mut _; + (*p).msg_namelen = namelen; + (*p).msg_iov = buffers.as_ptr() as *mut _; + (*p).msg_iovlen = buffers.len() as _; + (*p).msg_flags = 0; + msghdr.assume_init() + }; + + SendMsg { + msghdr, + _m: PhantomData + } + } +} + +pub struct RecvMsg<'a> { + msghdr: libc::msghdr, + _m: PhantomData<&'a mut libc::msghdr> +} + +impl <'a> RecvMsg<'a> { + pub fn new(buffers: &mut [IoSliceMut], mut cmsg_buffer: Option<&'a mut Vec>) -> Self { + let mut address = mem::MaybeUninit::uninit(); + let (msg_control, msg_controllen) = cmsg_buffer.as_mut() + .map(|v| (v.as_mut_ptr(), v.capacity())) + .unwrap_or((ptr::null_mut(), 0)); + + let msghdr = { + unsafe { + let mut msghdr = mem::MaybeUninit::::zeroed(); + let p = msghdr.as_mut_ptr(); + (*p).msg_name = address.as_mut_ptr() as *mut _; + (*p).msg_namelen = mem::size_of::() as libc::socklen_t; + (*p).msg_iov = buffers.as_ptr() as *mut _; + (*p).msg_iovlen = buffers.len() as _; + (*p).msg_control = msg_control as *mut _; + (*p).msg_controllen = msg_controllen as _; + (*p).msg_flags = 0; + msghdr.assume_init() + } + }; + + RecvMsg { + msghdr, + _m: PhantomData + } + } + + pub fn as_msghdr(&self) -> *const libc::msghdr { + &self.msghdr + } +} + + bitflags::bitflags! { /// [`SubmissionQueueEvent`](SubmissionQueueEvent) configuration flags. /// From 5e0cf19cd54aa1e61fc2eef871ece0f542ca68b9 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 24 Dec 2019 22:24:41 +0300 Subject: [PATCH 04/13] work --- src/lib.rs | 2 +- src/sqe.rs | 96 ++++++------------------------------------------- tests/socket.rs | 61 +++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 86 deletions(-) create mode 100644 tests/socket.rs diff --git a/src/lib.rs b/src/lib.rs index e8cfb85..2a9b461 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,7 @@ pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags pub use cqe::{CompletionQueue, CompletionQueueEvent}; pub use registrar::Registrar; -pub use nix::{poll::PollFlags, sys::socket::{SockAddr, SockFlag, MsgFlags}}; +pub use nix::{poll::PollFlags, sys::socket::{SockAddr, SockFlag}}; bitflags::bitflags! { /// `IoUring` initialization flags for advanced use cases. diff --git a/src/sqe.rs b/src/sqe.rs index df4ab8d..a6c3ca8 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -4,11 +4,9 @@ use std::os::unix::io::RawFd; use std::ptr::{self, NonNull}; use std::marker::PhantomData; use std::time::Duration; -use std::io::{IoSlice, IoSliceMut}; use super::IoUring; -use super::{PollFlags, SockAddr, SockFlag, MsgFlags}; -use nix::sys::socket; +use super::{PollFlags, SockAddr, SockFlag}; /// The queue of pending IO events. /// @@ -275,6 +273,16 @@ impl<'a> SubmissionQueueEvent<'a> { flags.bits() as _); } + #[inline] + pub unsafe fn prep_timeout_remove(&mut self, user_data: u64) { + uring_sys::io_uring_prep_timeout_remove(self.sqe, user_data as _, 0); + } + + #[inline] + pub unsafe fn prep_link_timeout(&mut self, ts: &uring_sys::__kernel_timespec) { + uring_sys::io_uring_prep_link_timeout(self.sqe, ts as *const _ as *mut _, 0); + } + #[inline] pub unsafe fn prep_poll_add(&mut self, fd: RawFd, poll_flags: PollFlags) { uring_sys::io_uring_prep_poll_add(self.sqe, fd, poll_flags.bits()) @@ -296,16 +304,6 @@ impl<'a> SubmissionQueueEvent<'a> { uring_sys::io_uring_prep_accept(self.sqe, fd, std::ptr::null_mut(), std::ptr::null_mut(), flags.bits()) } - #[inline] - pub unsafe fn prep_sendmsg<'b>(&mut self, fd: RawFd, msg: &'b SendMsg<'b>, flags: MsgFlags) { - uring_sys::io_uring_prep_sendmsg(self.sqe, fd, &msg.msghdr, flags.bits() as _) - } - - #[inline] - pub unsafe fn prep_recvmsg<'b>(&mut self, fd: RawFd, msg: &'b mut RecvMsg<'b>, flags: MsgFlags) { - uring_sys::io_uring_prep_recvmsg(self.sqe, fd, &mut msg.msghdr, flags.bits() as _) - } - /// Prepare a no-op event. /// ``` /// # use iou::{IoUring, SubmissionFlags}; @@ -381,78 +379,6 @@ impl<'a> SubmissionQueueEvent<'a> { unsafe impl<'a> Send for SubmissionQueueEvent<'a> { } unsafe impl<'a> Sync for SubmissionQueueEvent<'a> { } -pub struct SendMsg<'a> { - msghdr: libc::msghdr, - _m: PhantomData<&'a libc::msghdr>, -} - -impl<'a> SendMsg<'a> { - pub fn new(buffers: &'a [IoSlice<'a>], addr: Option<&'a SockAddr>) -> Self { - let (name, namelen) = match addr { - Some(addr) => { - let (x, y) = unsafe { addr.as_ffi_pair() }; - (x as *const _, y) - }, - None => (ptr::null(), 0), - }; - - let msghdr = unsafe { - let mut msghdr = mem::MaybeUninit::::zeroed(); - let p = msghdr.as_mut_ptr(); - (*p).msg_name = name as *mut _; - (*p).msg_namelen = namelen; - (*p).msg_iov = buffers.as_ptr() as *mut _; - (*p).msg_iovlen = buffers.len() as _; - (*p).msg_flags = 0; - msghdr.assume_init() - }; - - SendMsg { - msghdr, - _m: PhantomData - } - } -} - -pub struct RecvMsg<'a> { - msghdr: libc::msghdr, - _m: PhantomData<&'a mut libc::msghdr> -} - -impl <'a> RecvMsg<'a> { - pub fn new(buffers: &mut [IoSliceMut], mut cmsg_buffer: Option<&'a mut Vec>) -> Self { - let mut address = mem::MaybeUninit::uninit(); - let (msg_control, msg_controllen) = cmsg_buffer.as_mut() - .map(|v| (v.as_mut_ptr(), v.capacity())) - .unwrap_or((ptr::null_mut(), 0)); - - let msghdr = { - unsafe { - let mut msghdr = mem::MaybeUninit::::zeroed(); - let p = msghdr.as_mut_ptr(); - (*p).msg_name = address.as_mut_ptr() as *mut _; - (*p).msg_namelen = mem::size_of::() as libc::socklen_t; - (*p).msg_iov = buffers.as_ptr() as *mut _; - (*p).msg_iovlen = buffers.len() as _; - (*p).msg_control = msg_control as *mut _; - (*p).msg_controllen = msg_controllen as _; - (*p).msg_flags = 0; - msghdr.assume_init() - } - }; - - RecvMsg { - msghdr, - _m: PhantomData - } - } - - pub fn as_msghdr(&self) -> *const libc::msghdr { - &self.msghdr - } -} - - bitflags::bitflags! { /// [`SubmissionQueueEvent`](SubmissionQueueEvent) configuration flags. /// diff --git a/tests/socket.rs b/tests/socket.rs new file mode 100644 index 0000000..dbf355f --- /dev/null +++ b/tests/socket.rs @@ -0,0 +1,61 @@ +use nix::sys::socket::{AddressFamily, SockProtocol, SockType, InetAddr, SockFlag}; +use std::{ + io::{self, Read, Write}, + net::{TcpListener, TcpStream}, + os::unix::io::{AsRawFd, FromRawFd}, +}; + +const MESSAGE: &'static [u8] = "Hello World".as_bytes(); + +#[test] +fn accept() -> io::Result<()> { + let mut ring = iou::IoUring::new(1)?; + + let listener = TcpListener::bind(("0.0.0.0", 0))?; + listener.set_nonblocking(true)?; + + let mut stream = TcpStream::connect(listener.local_addr()?)?; + stream.write_all(MESSAGE)?; + + let fd = listener.as_raw_fd(); + let mut sq = ring.sq(); + let mut sqe = sq.next_sqe().expect("failed to get sqe"); + unsafe { + sqe.prep_accept(fd, iou::SockFlag::empty()); + sq.submit()?; + } + let cqe = ring.wait_for_cqe()?; + let accept_fd = cqe.result()?; + let mut accept_buf = [0; MESSAGE.len()]; + let mut stream = unsafe { TcpStream::from_raw_fd(accept_fd as _) }; + stream.read_exact(&mut accept_buf)?; + assert_eq!(accept_buf, MESSAGE); + Ok(()) +} + +#[test] +fn connect() -> io::Result<()> { + let listener = TcpListener::bind(("0.0.0.0", 0))?; + listener.set_nonblocking(true)?; + let listener_addr = iou::SockAddr::new_inet(InetAddr::from_std(&listener.local_addr()?)); + + let socket = nix::sys::socket::socket( + AddressFamily::Inet, + SockType::Stream, + SockFlag::SOCK_NONBLOCK, + SockProtocol::Tcp, + ) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to create socket"))?; + + let mut ring = iou::IoUring::new(1)?; + let mut sqe = ring.next_sqe().expect("failed to get sqe"); + unsafe { + sqe.prep_connect(socket, &listener_addr); + sqe.set_user_data(42); + ring.submit_sqes()?; + } + let cqe = ring.wait_for_cqe()?; + let _res = cqe.result()?; + assert_eq!(cqe.user_data(), 42); + Ok(()) +} From 4cc833da7ef82c3707397eb6e07a3b182adb3630 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 24 Dec 2019 22:39:04 +0300 Subject: [PATCH 05/13] work --- tests/poll.rs | 48 +++++++----------------------------------------- tests/socket.rs | 2 +- 2 files changed, 8 insertions(+), 42 deletions(-) diff --git a/tests/poll.rs b/tests/poll.rs index 171ebeb..da8fb0e 100644 --- a/tests/poll.rs +++ b/tests/poll.rs @@ -1,67 +1,35 @@ -#![feature(test)] extern crate libc; -extern crate test; - -use std::{io, os::unix::io::RawFd}; - -pub fn pipe() -> io::Result<(RawFd, RawFd)> { - unsafe { - let mut fds = core::mem::MaybeUninit::<[libc::c_int; 2]>::uninit(); - - let res = libc::pipe(fds.as_mut_ptr() as *mut libc::c_int); - - if res < 0 { - Err(io::Error::from_raw_os_error(-res)) - } else { - Ok((fds.assume_init()[0], fds.assume_init()[1])) - } - } -} +use std::{io::{self, Write}, os::unix::{io::AsRawFd, net}}; #[test] fn test_poll_add() -> io::Result<()> { let mut ring = iou::IoUring::new(2)?; - let (read, write) = pipe()?; - + let (read, mut write) = net::UnixStream::pair()?; unsafe { let mut sqe = ring.next_sqe().expect("no sqe"); - sqe.prep_poll_add(read, iou::PollFlags::POLLIN); + sqe.prep_poll_add(read.as_raw_fd(), iou::PollFlags::POLLIN); sqe.set_user_data(0xDEADBEEF); ring.submit_sqes()?; } - let res = unsafe { - let buf = b"hello"; - libc::write( - write, - buf.as_ptr() as *const libc::c_void, - buf.len() as libc::size_t, - ) - }; - - if res < 0 { - return Err(io::Error::from_raw_os_error(-res as _)); - } + write.write(&[1])?; let cqe = ring.wait_for_cqe()?; assert_eq!(cqe.user_data(), 0xDEADBEEF); let mask = unsafe { iou::PollFlags::from_bits_unchecked(cqe.result()? as _) }; assert!(mask.contains(iou::PollFlags::POLLIN)); - unsafe { - libc::close(write); - libc::close(read); - } + Ok(()) } #[test] fn test_poll_remove() -> io::Result<()> { let mut ring = iou::IoUring::new(2)?; - let (read, write) = pipe()?; + let (read, _write) = net::UnixStream::pair()?; unsafe { let mut sqe = ring.next_sqe().expect("no sqe"); - sqe.prep_poll_add(read, iou::PollFlags::POLLIN); + sqe.prep_poll_add(read.as_raw_fd(), iou::PollFlags::POLLIN); sqe.set_user_data(0xDEADBEEF); ring.submit_sqes()?; @@ -72,8 +40,6 @@ fn test_poll_remove() -> io::Result<()> { let cqe = ring.wait_for_cqe()?; let _ = cqe.result()?; } - libc::close(write); - libc::close(read); Ok(()) } } diff --git a/tests/socket.rs b/tests/socket.rs index dbf355f..e161e49 100644 --- a/tests/socket.rs +++ b/tests/socket.rs @@ -5,7 +5,7 @@ use std::{ os::unix::io::{AsRawFd, FromRawFd}, }; -const MESSAGE: &'static [u8] = "Hello World".as_bytes(); +const MESSAGE: &'static [u8] = b"Hello World"; #[test] fn accept() -> io::Result<()> { From 09097dca6f4ab9ab28c5efc9817ea3f0c3d6f028 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 24 Dec 2019 22:45:30 +0300 Subject: [PATCH 06/13] work --- tests/poll.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/poll.rs b/tests/poll.rs index da8fb0e..b6dba54 100644 --- a/tests/poll.rs +++ b/tests/poll.rs @@ -6,7 +6,7 @@ fn test_poll_add() -> io::Result<()> { let mut ring = iou::IoUring::new(2)?; let (read, mut write) = net::UnixStream::pair()?; unsafe { - let mut sqe = ring.next_sqe().expect("no sqe"); + let mut sqe = ring.next_sqe().expect("failed to get sqe"); sqe.prep_poll_add(read.as_raw_fd(), iou::PollFlags::POLLIN); sqe.set_user_data(0xDEADBEEF); ring.submit_sqes()?; @@ -28,12 +28,12 @@ fn test_poll_remove() -> io::Result<()> { let (read, _write) = net::UnixStream::pair()?; unsafe { - let mut sqe = ring.next_sqe().expect("no sqe"); + let mut sqe = ring.next_sqe().expect("failed to get sqe"); sqe.prep_poll_add(read.as_raw_fd(), iou::PollFlags::POLLIN); sqe.set_user_data(0xDEADBEEF); ring.submit_sqes()?; - let mut sqe = ring.next_sqe().expect("no sqe"); + let mut sqe = ring.next_sqe().expect("failed to get sqe"); sqe.prep_poll_remove(0xDEADBEEF); ring.submit_sqes()?; for _ in 0..2 { From bc7b2ad0fa8a10b463d3462933dfb137786522e6 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Tue, 24 Dec 2019 23:20:32 +0300 Subject: [PATCH 07/13] work --- tests/poll.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/poll.rs b/tests/poll.rs index b6dba54..6a320ed 100644 --- a/tests/poll.rs +++ b/tests/poll.rs @@ -1,10 +1,10 @@ -extern crate libc; -use std::{io::{self, Write}, os::unix::{io::AsRawFd, net}}; +use std::{io::{self, Write, Read}, os::unix::{io::AsRawFd, net}}; +const MESSAGE: &'static [u8] = b"Hello World"; #[test] fn test_poll_add() -> io::Result<()> { let mut ring = iou::IoUring::new(2)?; - let (read, mut write) = net::UnixStream::pair()?; + let (mut read, mut write) = net::UnixStream::pair()?; unsafe { let mut sqe = ring.next_sqe().expect("failed to get sqe"); sqe.prep_poll_add(read.as_raw_fd(), iou::PollFlags::POLLIN); @@ -12,13 +12,15 @@ fn test_poll_add() -> io::Result<()> { ring.submit_sqes()?; } - write.write(&[1])?; + write.write(MESSAGE)?; let cqe = ring.wait_for_cqe()?; assert_eq!(cqe.user_data(), 0xDEADBEEF); let mask = unsafe { iou::PollFlags::from_bits_unchecked(cqe.result()? as _) }; assert!(mask.contains(iou::PollFlags::POLLIN)); - + let mut buf = [0; MESSAGE.len()]; + read.read(&mut buf); + assert_eq!(buf, MESSAGE); Ok(()) } From ca2762dc623369b4ff47cf4761bf3371e0e2a2ed Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 25 Dec 2019 00:00:14 +0300 Subject: [PATCH 08/13] work --- Cargo.toml | 3 --- tests/poll.rs | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 693547e..221232b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,5 @@ edition = "2018" [dependencies] bitflags = "1.2.0" nix = "0.16.0" -libc = "0.2.66" uring-sys = "1.0.0-beta" -[dev-dependencies] - diff --git a/tests/poll.rs b/tests/poll.rs index 6a320ed..c798fd0 100644 --- a/tests/poll.rs +++ b/tests/poll.rs @@ -19,7 +19,7 @@ fn test_poll_add() -> io::Result<()> { let mask = unsafe { iou::PollFlags::from_bits_unchecked(cqe.result()? as _) }; assert!(mask.contains(iou::PollFlags::POLLIN)); let mut buf = [0; MESSAGE.len()]; - read.read(&mut buf); + read.read(&mut buf)?; assert_eq!(buf, MESSAGE); Ok(()) } From 235f4b816e1fb1f29431068f1d0d36c8a2d02580 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 25 Dec 2019 16:17:34 +0300 Subject: [PATCH 09/13] progress --- src/lib.rs | 2 +- src/sqe.rs | 35 +++++++++++++++++++++++++++++++++-- tests/socket.rs | 34 +++++++++++++++++++++++++++++++++- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2a9b461..2bdda13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,7 @@ use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; use std::time::Duration; -pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags}; +pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags, Accept}; pub use cqe::{CompletionQueue, CompletionQueueEvent}; pub use registrar::Registrar; diff --git a/src/sqe.rs b/src/sqe.rs index a6c3ca8..f1a5754 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -300,8 +300,12 @@ impl<'a> SubmissionQueueEvent<'a> { } #[inline] - pub unsafe fn prep_accept(&mut self, fd: RawFd, flags: SockFlag) { - uring_sys::io_uring_prep_accept(self.sqe, fd, std::ptr::null_mut(), std::ptr::null_mut(), flags.bits()) + pub unsafe fn prep_accept(&mut self, fd: RawFd, accept: Option<&mut Accept>, flags: SockFlag) { + let (addr, len) = match accept { + Some(accept) => (accept.storage.as_mut_ptr() as *mut _, &mut accept.len as *mut _ as *mut _), + None => (std::ptr::null_mut(), std::ptr::null_mut()) + }; + uring_sys::io_uring_prep_accept(self.sqe, fd, addr, len, flags.bits()) } /// Prepare a no-op event. @@ -379,6 +383,33 @@ impl<'a> SubmissionQueueEvent<'a> { unsafe impl<'a> Send for SubmissionQueueEvent<'a> { } unsafe impl<'a> Sync for SubmissionQueueEvent<'a> { } +pub struct Accept { + storage: mem::MaybeUninit, + len: usize, +} + +impl Accept { + pub fn uninit() -> Self { + let storage = mem::MaybeUninit::uninit(); + let len = mem::size_of::(); + Accept { + storage, + len + } + } + + pub unsafe fn as_socket_addr(&self) -> io::Result { + let storage = &*self.storage.as_ptr(); + nix::sys::socket::sockaddr_storage_to_addr(storage, self.len).map_err(|e| { + let err_no = e.as_errno(); + match err_no { + Some(err_no) => io::Error::from_raw_os_error(err_no as _), + None => io::Error::new(io::ErrorKind::Other, "Unknown error") + } + }) + } +} + bitflags::bitflags! { /// [`SubmissionQueueEvent`](SubmissionQueueEvent) configuration flags. /// diff --git a/tests/socket.rs b/tests/socket.rs index e161e49..02f9c69 100644 --- a/tests/socket.rs +++ b/tests/socket.rs @@ -4,6 +4,7 @@ use std::{ net::{TcpListener, TcpStream}, os::unix::io::{AsRawFd, FromRawFd}, }; +use iou::SockAddr; const MESSAGE: &'static [u8] = b"Hello World"; @@ -21,7 +22,7 @@ fn accept() -> io::Result<()> { let mut sq = ring.sq(); let mut sqe = sq.next_sqe().expect("failed to get sqe"); unsafe { - sqe.prep_accept(fd, iou::SockFlag::empty()); + sqe.prep_accept(fd, None, iou::SockFlag::empty()); sq.submit()?; } let cqe = ring.wait_for_cqe()?; @@ -33,6 +34,37 @@ fn accept() -> io::Result<()> { Ok(()) } +#[test] +fn accept_with_info() -> io::Result<()> { + let mut ring = iou::IoUring::new(1)?; + + let listener = TcpListener::bind(("0.0.0.0", 0))?; + listener.set_nonblocking(true)?; + + let mut connection_stream = TcpStream::connect(listener.local_addr()?)?; + connection_stream.write_all(MESSAGE)?; + + let fd = listener.as_raw_fd(); + let mut sq = ring.sq(); + let mut sqe = sq.next_sqe().expect("failed to get sqe"); + let mut accept = iou::Accept::uninit(); + unsafe { + sqe.prep_accept(fd, Some(&mut accept), iou::SockFlag::empty()); + sq.submit()?; + } + let cqe = ring.wait_for_cqe()?; + let accept_fd = cqe.result()?; + let mut accept_buf = [0; MESSAGE.len()]; + let mut accepted_stream = unsafe { TcpStream::from_raw_fd(accept_fd as _) }; + accepted_stream.read_exact(&mut accept_buf)?; + assert_eq!(accept_buf, MESSAGE); + + let addr = unsafe { accept.as_socket_addr()? }; + let connection_addr = SockAddr::Inet(InetAddr::from_std(&connection_stream.local_addr()?)); + assert_eq!(addr, connection_addr); + Ok(()) +} + #[test] fn connect() -> io::Result<()> { let listener = TcpListener::bind(("0.0.0.0", 0))?; From 8309ad99e4291416b743e933264174c294515e78 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 25 Dec 2019 16:20:13 +0300 Subject: [PATCH 10/13] progress --- src/lib.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 2bdda13..f5789f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,18 @@ pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags pub use cqe::{CompletionQueue, CompletionQueueEvent}; pub use registrar::Registrar; -pub use nix::{poll::PollFlags, sys::socket::{SockAddr, SockFlag}}; +pub use nix::{poll::PollFlags, + sys::socket::{ + SockAddr, + SockFlag, + InetAddr, + UnixAddr, + NetlinkAddr, + AlgAddr, + LinkAddr, + VsockAddr + } +}; bitflags::bitflags! { /// `IoUring` initialization flags for advanced use cases. From 71715cf36127a494dcc94ed6cdcb75b24838806f Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 25 Dec 2019 16:28:43 +0300 Subject: [PATCH 11/13] progress --- src/lib.rs | 2 +- src/sqe.rs | 8 +++---- tests/{socket.rs => accept.rs} | 39 ++++++---------------------------- tests/connect.rs | 29 +++++++++++++++++++++++++ 4 files changed, 40 insertions(+), 38 deletions(-) rename tests/{socket.rs => accept.rs} (61%) create mode 100644 tests/connect.rs diff --git a/src/lib.rs b/src/lib.rs index f5789f6..dafede9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,7 @@ use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; use std::time::Duration; -pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags, Accept}; +pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags, AcceptParams}; pub use cqe::{CompletionQueue, CompletionQueueEvent}; pub use registrar::Registrar; diff --git a/src/sqe.rs b/src/sqe.rs index f1a5754..b2aafa0 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -300,7 +300,7 @@ impl<'a> SubmissionQueueEvent<'a> { } #[inline] - pub unsafe fn prep_accept(&mut self, fd: RawFd, accept: Option<&mut Accept>, flags: SockFlag) { + pub unsafe fn prep_accept(&mut self, fd: RawFd, accept: Option<&mut AcceptParams>, flags: SockFlag) { let (addr, len) = match accept { Some(accept) => (accept.storage.as_mut_ptr() as *mut _, &mut accept.len as *mut _ as *mut _), None => (std::ptr::null_mut(), std::ptr::null_mut()) @@ -383,16 +383,16 @@ impl<'a> SubmissionQueueEvent<'a> { unsafe impl<'a> Send for SubmissionQueueEvent<'a> { } unsafe impl<'a> Sync for SubmissionQueueEvent<'a> { } -pub struct Accept { +pub struct AcceptParams { storage: mem::MaybeUninit, len: usize, } -impl Accept { +impl AcceptParams { pub fn uninit() -> Self { let storage = mem::MaybeUninit::uninit(); let len = mem::size_of::(); - Accept { + AcceptParams { storage, len } diff --git a/tests/socket.rs b/tests/accept.rs similarity index 61% rename from tests/socket.rs rename to tests/accept.rs index 02f9c69..f49f28a 100644 --- a/tests/socket.rs +++ b/tests/accept.rs @@ -1,4 +1,4 @@ -use nix::sys::socket::{AddressFamily, SockProtocol, SockType, InetAddr, SockFlag}; +use nix::sys::socket::InetAddr; use std::{ io::{self, Read, Write}, net::{TcpListener, TcpStream}, @@ -35,7 +35,7 @@ fn accept() -> io::Result<()> { } #[test] -fn accept_with_info() -> io::Result<()> { +fn accept_with_params() -> io::Result<()> { let mut ring = iou::IoUring::new(1)?; let listener = TcpListener::bind(("0.0.0.0", 0))?; @@ -47,9 +47,9 @@ fn accept_with_info() -> io::Result<()> { let fd = listener.as_raw_fd(); let mut sq = ring.sq(); let mut sqe = sq.next_sqe().expect("failed to get sqe"); - let mut accept = iou::Accept::uninit(); + let mut accept_params = iou::AcceptParams::uninit(); unsafe { - sqe.prep_accept(fd, Some(&mut accept), iou::SockFlag::empty()); + sqe.prep_accept(fd, Some(&mut accept_params), iou::SockFlag::empty()); sq.submit()?; } let cqe = ring.wait_for_cqe()?; @@ -59,35 +59,8 @@ fn accept_with_info() -> io::Result<()> { accepted_stream.read_exact(&mut accept_buf)?; assert_eq!(accept_buf, MESSAGE); - let addr = unsafe { accept.as_socket_addr()? }; + let addr = unsafe { accept_params.as_socket_addr()? }; let connection_addr = SockAddr::Inet(InetAddr::from_std(&connection_stream.local_addr()?)); assert_eq!(addr, connection_addr); Ok(()) -} - -#[test] -fn connect() -> io::Result<()> { - let listener = TcpListener::bind(("0.0.0.0", 0))?; - listener.set_nonblocking(true)?; - let listener_addr = iou::SockAddr::new_inet(InetAddr::from_std(&listener.local_addr()?)); - - let socket = nix::sys::socket::socket( - AddressFamily::Inet, - SockType::Stream, - SockFlag::SOCK_NONBLOCK, - SockProtocol::Tcp, - ) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to create socket"))?; - - let mut ring = iou::IoUring::new(1)?; - let mut sqe = ring.next_sqe().expect("failed to get sqe"); - unsafe { - sqe.prep_connect(socket, &listener_addr); - sqe.set_user_data(42); - ring.submit_sqes()?; - } - let cqe = ring.wait_for_cqe()?; - let _res = cqe.result()?; - assert_eq!(cqe.user_data(), 42); - Ok(()) -} +} \ No newline at end of file diff --git a/tests/connect.rs b/tests/connect.rs new file mode 100644 index 0000000..a23ea61 --- /dev/null +++ b/tests/connect.rs @@ -0,0 +1,29 @@ +use nix::sys::socket::{AddressFamily, SockProtocol, SockType, InetAddr, SockFlag}; +use std::{io, net::TcpListener}; + +#[test] +fn connect() -> io::Result<()> { + let listener = TcpListener::bind(("0.0.0.0", 0))?; + listener.set_nonblocking(true)?; + let listener_addr = iou::SockAddr::new_inet(InetAddr::from_std(&listener.local_addr()?)); + + let socket = nix::sys::socket::socket( + AddressFamily::Inet, + SockType::Stream, + SockFlag::SOCK_NONBLOCK, + SockProtocol::Tcp, + ) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to create socket"))?; + + let mut ring = iou::IoUring::new(1)?; + let mut sqe = ring.next_sqe().expect("failed to get sqe"); + unsafe { + sqe.prep_connect(socket, &listener_addr); + sqe.set_user_data(42); + ring.submit_sqes()?; + } + let cqe = ring.wait_for_cqe()?; + let _res = cqe.result()?; + assert_eq!(cqe.user_data(), 42); + Ok(()) +} From a9ea0ec4cb13b7f83c66a3e7e1b2fe78f615a556 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Thu, 26 Dec 2019 14:49:32 +0300 Subject: [PATCH 12/13] progress --- Cargo.toml | 3 +++ tests/accept.rs | 2 ++ tests/connect.rs | 1 + tests/poll.rs | 25 ++++++++++++++++++++++--- 4 files changed, 28 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 221232b..8278994 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,6 @@ bitflags = "1.2.0" nix = "0.16.0" uring-sys = "1.0.0-beta" +[dev-dependencies] +semver = "0.9.0" + diff --git a/tests/accept.rs b/tests/accept.rs index f49f28a..e9a8ead 100644 --- a/tests/accept.rs +++ b/tests/accept.rs @@ -9,6 +9,7 @@ use iou::SockAddr; const MESSAGE: &'static [u8] = b"Hello World"; #[test] +#[ignore] // kernel 5.5 needed for accept fn accept() -> io::Result<()> { let mut ring = iou::IoUring::new(1)?; @@ -35,6 +36,7 @@ fn accept() -> io::Result<()> { } #[test] +#[ignore] // kernel 5.5 needed for accept fn accept_with_params() -> io::Result<()> { let mut ring = iou::IoUring::new(1)?; diff --git a/tests/connect.rs b/tests/connect.rs index a23ea61..522c800 100644 --- a/tests/connect.rs +++ b/tests/connect.rs @@ -2,6 +2,7 @@ use nix::sys::socket::{AddressFamily, SockProtocol, SockType, InetAddr, SockFlag use std::{io, net::TcpListener}; #[test] +#[ignore] // kernel 5.5 needed for connect fn connect() -> io::Result<()> { let listener = TcpListener::bind(("0.0.0.0", 0))?; listener.set_nonblocking(true)?; diff --git a/tests/poll.rs b/tests/poll.rs index c798fd0..2d53893 100644 --- a/tests/poll.rs +++ b/tests/poll.rs @@ -1,4 +1,7 @@ -use std::{io::{self, Write, Read}, os::unix::{io::AsRawFd, net}}; +use std::{ + io::{self, Read, Write}, + os::unix::{io::AsRawFd, net}, +}; const MESSAGE: &'static [u8] = b"Hello World"; #[test] @@ -28,7 +31,8 @@ fn test_poll_add() -> io::Result<()> { fn test_poll_remove() -> io::Result<()> { let mut ring = iou::IoUring::new(2)?; let (read, _write) = net::UnixStream::pair()?; - + let uname = nix::sys::utsname::uname(); + let version = semver::Version::parse(uname.release()); unsafe { let mut sqe = ring.next_sqe().expect("failed to get sqe"); sqe.prep_poll_add(read.as_raw_fd(), iou::PollFlags::POLLIN); @@ -37,10 +41,25 @@ fn test_poll_remove() -> io::Result<()> { let mut sqe = ring.next_sqe().expect("failed to get sqe"); sqe.prep_poll_remove(0xDEADBEEF); + sqe.set_user_data(42); ring.submit_sqes()?; for _ in 0..2 { let cqe = ring.wait_for_cqe()?; - let _ = cqe.result()?; + let user_data = cqe.user_data(); + if version < semver::Version::parse("5.5.0-0") { + let _ = cqe.result()?; + } else if user_data == 0xDEADBEEF { + let err = cqe + .result() + .expect_err("on kernels >=5.5 error is expected"); + let err_no = nix::errno::Errno::from_i32( + err.raw_os_error() + .expect("on kernels >=5.5 os_error is expected"), + ); + assert_eq!(err_no, nix::errno::Errno::ECANCELED); + } else { + let _ = cqe.result()?; + } } Ok(()) } From 9dd821f4ba59f94c21d6e990d7b2c9d0b24fed52 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Wed, 22 Jan 2020 21:54:38 +0300 Subject: [PATCH 13/13] resolve discussions --- src/lib.rs | 23 +++++++++++------------ src/sqe.rs | 8 ++++---- tests/accept.rs | 2 +- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index dafede9..24b31c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,21 +55,20 @@ use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; use std::time::Duration; -pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags, AcceptParams}; +pub use sqe::{SubmissionQueue, SubmissionQueueEvent, SubmissionFlags, FsyncFlags, SockAddrStorage}; pub use cqe::{CompletionQueue, CompletionQueueEvent}; pub use registrar::Registrar; -pub use nix::{poll::PollFlags, - sys::socket::{ - SockAddr, - SockFlag, - InetAddr, - UnixAddr, - NetlinkAddr, - AlgAddr, - LinkAddr, - VsockAddr - } +pub use nix::poll::PollFlags; +pub use nix::sys::socket::{ + SockAddr, + SockFlag, + InetAddr, + UnixAddr, + NetlinkAddr, + AlgAddr, + LinkAddr, + VsockAddr }; bitflags::bitflags! { diff --git a/src/sqe.rs b/src/sqe.rs index b2aafa0..9ca48ec 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -300,7 +300,7 @@ impl<'a> SubmissionQueueEvent<'a> { } #[inline] - pub unsafe fn prep_accept(&mut self, fd: RawFd, accept: Option<&mut AcceptParams>, flags: SockFlag) { + pub unsafe fn prep_accept(&mut self, fd: RawFd, accept: Option<&mut SockAddrStorage>, flags: SockFlag) { let (addr, len) = match accept { Some(accept) => (accept.storage.as_mut_ptr() as *mut _, &mut accept.len as *mut _ as *mut _), None => (std::ptr::null_mut(), std::ptr::null_mut()) @@ -383,16 +383,16 @@ impl<'a> SubmissionQueueEvent<'a> { unsafe impl<'a> Send for SubmissionQueueEvent<'a> { } unsafe impl<'a> Sync for SubmissionQueueEvent<'a> { } -pub struct AcceptParams { +pub struct SockAddrStorage { storage: mem::MaybeUninit, len: usize, } -impl AcceptParams { +impl SockAddrStorage { pub fn uninit() -> Self { let storage = mem::MaybeUninit::uninit(); let len = mem::size_of::(); - AcceptParams { + SockAddrStorage { storage, len } diff --git a/tests/accept.rs b/tests/accept.rs index e9a8ead..af3e533 100644 --- a/tests/accept.rs +++ b/tests/accept.rs @@ -49,7 +49,7 @@ fn accept_with_params() -> io::Result<()> { let fd = listener.as_raw_fd(); let mut sq = ring.sq(); let mut sqe = sq.next_sqe().expect("failed to get sqe"); - let mut accept_params = iou::AcceptParams::uninit(); + let mut accept_params = iou::SockAddrStorage::uninit(); unsafe { sqe.prep_accept(fd, Some(&mut accept_params), iou::SockFlag::empty()); sq.submit()?;