From b86e5b0c5ddfe028cd6f6370c34172212f4166ea Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 18 Mar 2021 14:24:01 -0400 Subject: [PATCH 1/2] simplify link rings The code as is is prior to cancellations. Now that we support cancellations we can simplify it, by making sure that we always cancel the request once we're done sleeping. --- glommio/src/sys/mod.rs | 8 +---- glommio/src/sys/uring.rs | 69 +++++++++++++++------------------------- 2 files changed, 27 insertions(+), 50 deletions(-) diff --git a/glommio/src/sys/mod.rs b/glommio/src/sys/mod.rs index b9487d815..289da5566 100644 --- a/glommio/src/sys/mod.rs +++ b/glommio/src/sys/mod.rs @@ -345,12 +345,6 @@ pub(crate) enum PollableStatus { NonPollable(DirectIO), } -#[derive(Debug, Copy, Clone)] -pub(crate) enum LinkStatus { - Freestanding, - Linked, -} - #[derive(Debug)] pub(crate) enum SourceType { Write(PollableStatus, IOBuffer), @@ -373,7 +367,7 @@ pub(crate) enum SourceType { FdataSync, Fallocate, Close, - LinkRings(LinkStatus), + LinkRings, Statx(CString, Box>), Timeout(TimeSpec64), Connect(SockAddr), diff --git a/glommio/src/sys/uring.rs b/glommio/src/sys/uring.rs index f1ffa4dd2..fd19aa5a8 100644 --- a/glommio/src/sys/uring.rs +++ b/glommio/src/sys/uring.rs @@ -34,7 +34,6 @@ use crate::{ DirectIO, IOBuffer, InnerSource, - LinkStatus, PollableStatus, Source, SourceType, @@ -790,10 +789,6 @@ impl Source { self.inner.source_type.borrow_mut() } - pub(crate) fn update_source_type(&self, source_type: SourceType) -> SourceType { - self.inner.update_source_type(source_type) - } - pub(crate) fn extract_source_type(&self) -> SourceType { self.inner.update_source_type(SourceType::Invalid) } @@ -926,30 +921,21 @@ impl SleepableRing { } fn sleep(&mut self, link: &mut Source, eventfd_src: &Source) -> io::Result { - let is_freestanding = match &*link.source_type() { - SourceType::LinkRings(LinkStatus::Linked) => false, // nothing to do - SourceType::LinkRings(LinkStatus::Freestanding) => true, - _ => panic!("Unexpected source type when linking rings"), - }; - - if is_freestanding { - if let Some(mut sqe) = self.ring.prepare_sqe() { - self.waiting_submission += 1; - link.update_source_type(SourceType::LinkRings(LinkStatus::Linked)); + if let Some(mut sqe) = self.ring.prepare_sqe() { + self.waiting_submission += 1; - let op = UringDescriptor { - fd: link.raw(), - flags: SubmissionFlags::empty(), - user_data: to_user_data(add_source(link, self.submission_queue.clone())), - args: UringOpDescriptor::PollAdd(common_flags() | read_flags()), - }; - fill_sqe(&mut sqe, &op, DmaBuffer::new); - } else { - // Can't link rings because we ran out of CQEs. Just can't sleep. - // Submit what we have, once we're out of here we'll consume them - // and at some point will be able to sleep again. - return self.ring.submit_sqes().map(|x| x as usize); - } + let op = UringDescriptor { + fd: link.raw(), + flags: SubmissionFlags::empty(), + user_data: to_user_data(add_source(link, self.submission_queue.clone())), + args: UringOpDescriptor::PollAdd(common_flags() | read_flags()), + }; + fill_sqe(&mut sqe, &op, DmaBuffer::new); + } else { + // Can't link rings because we ran out of CQEs. Just can't sleep. + // Submit what we have, once we're out of here we'll consume them + // and at some point will be able to sleep again. + return self.ring.submit_sqes().map(|x| x as usize); } let res = eventfd_src.take_result(); @@ -1002,13 +988,7 @@ impl UringCommon for SleepableRing { process_one_event( self.ring.peek_for_cqe(), |source| match &mut *source.source_type.borrow_mut() { - SourceType::LinkRings(status @ LinkStatus::Linked) => { - *status = LinkStatus::Freestanding; - Some(()) - } - SourceType::LinkRings(LinkStatus::Freestanding) => { - panic!("Impossible to have an event firing like this"); - } + SourceType::LinkRings => Some(()), SourceType::Timeout(_) => Some(()), _ => None, }, @@ -1062,9 +1042,10 @@ pub(crate) struct Reactor { latency_ring: RefCell, poll_ring: RefCell, - link_rings_src: RefCell, timeout_src: Cell>, + link_fd: RawFd, + // This keeps the eventfd alive. Drop will close it when we're done notifier: Arc, // This is the source used to handle the notifications into the ring @@ -1158,11 +1139,6 @@ impl Reactor { let latency_ring = SleepableRing::new(128, "latency", allocator.clone())?; let link_fd = latency_ring.ring_fd(); - let link_rings_src = Source::new( - IoRequirements::default(), - link_fd, - SourceType::LinkRings(LinkStatus::Freestanding), - ); let eventfd_src = Source::new( IoRequirements::default(), @@ -1175,8 +1151,8 @@ impl Reactor { main_ring: RefCell::new(main_ring), latency_ring: RefCell::new(latency_ring), poll_ring: RefCell::new(poll_ring), - link_rings_src: RefCell::new(link_rings_src), timeout_src: Cell::new(None), + link_fd, notifier, eventfd_src, }) @@ -1344,7 +1320,11 @@ impl Reactor { ring: &mut SleepableRing, eventfd_src: &Source, ) -> io::Result<()> { - let mut link_rings = self.link_rings_src.borrow_mut(); + let mut link_rings = Source::new( + IoRequirements::default(), + self.link_fd, + SourceType::LinkRings, + ); ring.sleep(&mut link_rings, eventfd_src) .or_else(Self::busy_ok)?; Ok(()) @@ -1467,6 +1447,9 @@ impl Reactor { .expect("some error"); // woke up, so no need to notify us anymore. self.notifier.wake_up(); + // may have new cancellations related to the link ring fd. + flush_cancellations!(into wakers; main_ring); + flush_rings!(main_ring)?; consume_rings!(into wakers; lat_ring, poll_ring, main_ring); } } From f51ca7f41cf93e94f1518d3c0e40448e2a45e859 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 18 Mar 2021 20:23:49 -0400 Subject: [PATCH 2/2] fix prepare_sqes This method takes a slice from the sqe queue, but that is completely wrong: the sqe queue is a circular buffer, so if you try to get 2 elements at position n - 1 you access invalid memory. This patch transforms the very helpful SQEs structure so that it now only has the information needed to access each SQE and constructs them from there. --- glommio/src/iou/sqe.rs | 36 +++++++++++++++++++++-------- glommio/src/iou/submission_queue.rs | 10 +++----- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/glommio/src/iou/sqe.rs b/glommio/src/iou/sqe.rs index 60a2ab53a..59eb6b0d8 100644 --- a/glommio/src/iou/sqe.rs +++ b/glommio/src/iou/sqe.rs @@ -5,7 +5,6 @@ use std::{ ops::{Deref, DerefMut}, os::unix::io::RawFd, ptr, - slice, }; use super::registrar::{UringFd, UringReadBuf, UringWriteBuf}; @@ -606,13 +605,23 @@ bitflags::bitflags! { /// A sequence of [`SQE`]s from the [`SubmissionQueue`][crate::SubmissionQueue]. pub struct SQEs<'ring> { - sqes: slice::IterMut<'ring, uring_sys::io_uring_sqe>, + sq: &'ring mut uring_sys::io_uring_sq, + first: u32, + count: u32, + consumed: u32, } impl<'ring> SQEs<'ring> { - pub(crate) fn new(slice: &'ring mut [uring_sys::io_uring_sqe]) -> SQEs<'ring> { + pub(crate) fn new( + sq: &'ring mut uring_sys::io_uring_sq, + first: u32, + count: u32, + ) -> SQEs<'ring> { SQEs { - sqes: slice.iter_mut(), + sq, + first, + count, + consumed: 0, } } @@ -646,14 +655,23 @@ impl<'ring> SQEs<'ring> { /// Remaining [`SQE`]s that can be modified. pub fn remaining(&self) -> u32 { - self.sqes.len() as u32 + (self.count - self.consumed) as u32 } fn consume(&mut self) -> Option> { - self.sqes.next().map(|sqe| { - unsafe { uring_sys::io_uring_prep_nop(sqe) } - SQE { sqe } - }) + if self.consumed < self.count { + unsafe { + let sqe = self + .sq + .sqes + .offset(((self.first + self.consumed) & *self.sq.kring_mask) as isize); + uring_sys::io_uring_prep_nop(sqe); + self.consumed += 1; + Some(SQE { sqe: &mut *sqe }) + } + } else { + None + } } } diff --git a/glommio/src/iou/submission_queue.rs b/glommio/src/iou/submission_queue.rs index cfd1a87cc..9972929d6 100644 --- a/glommio/src/iou/submission_queue.rs +++ b/glommio/src/iou/submission_queue.rs @@ -3,7 +3,6 @@ use std::{ io, marker::PhantomData, ptr::NonNull, - slice, sync::atomic::{self, Ordering}, time::Duration, }; @@ -119,19 +118,16 @@ pub(crate) unsafe fn prepare_sqe<'a>(ring: &mut uring_sys::io_uring) -> Option( - sq: &mut uring_sys::io_uring_sq, - count: u32, -) -> Option> { +pub(crate) unsafe fn prepare_sqes(sq: &mut uring_sys::io_uring_sq, count: u32) -> Option> { atomic::fence(Ordering::Acquire); let head: u32 = *sq.khead; let next: u32 = sq.sqe_tail + count; if next - head <= *sq.kring_entries { - let sqe = sq.sqes.offset((sq.sqe_tail & *sq.kring_mask) as isize); + let first = sq.sqe_tail; sq.sqe_tail = next; - Some(SQEs::new(slice::from_raw_parts_mut(sqe, count as usize))) + Some(SQEs::new(sq, first, count)) } else { None }