From 94a1dd0f8b09420a8f120ac35c32e3e7be6dec4c Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 15 Jun 2021 16:08:35 +0100 Subject: [PATCH] Completion: only construct via with_completion Removes need for 'armed' flag - Completion instances always contain a rados_completion_t that has been used to start an operation --- src/completion.rs | 87 ++++++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/src/completion.rs b/src/completion.rs index 5a18d53..1d79afe 100644 --- a/src/completion.rs +++ b/src/completion.rs @@ -25,11 +25,9 @@ use crate::rados::{ rados_completion_t, }; -pub struct Completion { +struct Completion { inner: rados_completion_t, - armed: bool, - // Box to provide a stable address for completion_complete callback // Mutex to make Sync-safe for write from poll() vs read from completion_complete waker: Box>>, @@ -39,6 +37,8 @@ pub struct Completion { // to be an Arc rather than a raw rados_ioctx_t because otherwise // there would be nothing to stop the rados_ioctx_t being invalidated // during the lifetime of this Completion. + // (AioCompletionImpl does hold a reference to IoCtxImpl for writes, but + // not for reads.) ioctx: Arc, } @@ -58,39 +58,14 @@ pub extern "C" fn completion_complete(_cb: rados_completion_t, arg: *mut c_void) } } -impl Completion { - fn new(ioctx: Arc) -> Self { - let mut waker = Box::new(Mutex::new(None)); - - let completion = unsafe { - let mut completion: rados_completion_t = std::ptr::null_mut(); - let p: *mut Mutex> = &mut *waker; - let p = p as *mut c_void; - - let r = rados_aio_create_completion2(p, Some(completion_complete), &mut completion); - if r != 0 { - panic!("Error {} allocating RADOS completion: out of memory?", r); - } - - completion - }; - - Self { - inner: completion, - waker, - ioctx, - armed: false, - } - } -} - impl Drop for Completion { fn drop(&mut self) { - let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0; - // Ensure that after dropping the Completion, the AIO callback - // will not be called on our dropped waker Box - if self.armed && !am_complete { + // will not be called on our dropped waker Box. Only necessary + // if we got as far as successfully starting an operation using + // the completion. + let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0; + if !am_complete { unsafe { rados_aio_cancel(self.ioctx.ioctx, self.inner); rados_aio_wait_for_complete_and_cb(self.inner); @@ -122,16 +97,52 @@ impl std::future::Future for Completion { } } -pub async fn with_completion(ioctx: Arc, f: F) -> RadosResult +fn with_completion_impl(ioctx: Arc, f: F) -> RadosResult where F: FnOnce(rados_completion_t) -> libc::c_int, { - let mut completion = Completion::new(ioctx); - let ret_code = f(completion.inner); + let mut waker = Box::new(Mutex::new(None)); + + let completion = unsafe { + let mut completion: rados_completion_t = std::ptr::null_mut(); + let p: *mut Mutex> = &mut *waker; + let p = p as *mut c_void; + + let r = rados_aio_create_completion2(p, Some(completion_complete), &mut completion); + if r != 0 { + panic!("Error {} allocating RADOS completion: out of memory?", r); + } + + completion + }; + + let ret_code = f(completion); if ret_code < 0 { + // On error dispatching I/O, drop the unused rados_completion_t + unsafe { + rados_aio_release(completion); + drop(completion) + } Err(ret_code.into()) } else { - completion.armed = true; - completion.await + // Pass the rados_completion_t into a Future-implementing wrapper and await it. + Ok(Completion { + ioctx, + inner: completion, + waker, + }) } } +/// Completions are only created via this wrapper, in order to ensure +/// that the Completion struct is only constructed around 'armed' rados_completion_t +/// instances (i.e. those that have been used to start an I/O). +pub async fn with_completion(ioctx: Arc, f: F) -> RadosResult +where + F: FnOnce(rados_completion_t) -> libc::c_int, +{ + // Hide c_void* temporaries in a non-async function so that the future generated + // by this function isn't encumbered by their non-Send-ness. + let completion = with_completion_impl(ioctx, f)?; + + completion.await +}