Skip to content

Commit

Permalink
Completion: only construct via with_completion
Browse files Browse the repository at this point in the history
Removes need for 'armed' flag - Completion instances
always contain a rados_completion_t that has been
used to start an operation
  • Loading branch information
jcsp committed Jun 15, 2021
1 parent 6cdb0e4 commit 94a1dd0
Showing 1 changed file with 49 additions and 38 deletions.
87 changes: 49 additions & 38 deletions src/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ use crate::rados::{
rados_completion_t,
};

pub struct Completion {
struct Completion {
inner: rados_completion_t,

armed: bool,

// Box to provide a stable address for completion_complete callback
// Mutex to make Sync-safe for write from poll() vs read from completion_complete
waker: Box<std::sync::Mutex<Option<std::task::Waker>>>,
Expand All @@ -39,6 +37,8 @@ pub struct Completion {
// to be an Arc rather than a raw rados_ioctx_t because otherwise
// there would be nothing to stop the rados_ioctx_t being invalidated
// during the lifetime of this Completion.
// (AioCompletionImpl does hold a reference to IoCtxImpl for writes, but
// not for reads.)
ioctx: Arc<IoCtx>,
}

Expand All @@ -58,39 +58,14 @@ pub extern "C" fn completion_complete(_cb: rados_completion_t, arg: *mut c_void)
}
}

impl Completion {
fn new(ioctx: Arc<IoCtx>) -> Self {
let mut waker = Box::new(Mutex::new(None));

let completion = unsafe {
let mut completion: rados_completion_t = std::ptr::null_mut();
let p: *mut Mutex<Option<Waker>> = &mut *waker;
let p = p as *mut c_void;

let r = rados_aio_create_completion2(p, Some(completion_complete), &mut completion);
if r != 0 {
panic!("Error {} allocating RADOS completion: out of memory?", r);
}

completion
};

Self {
inner: completion,
waker,
ioctx,
armed: false,
}
}
}

impl Drop for Completion {
fn drop(&mut self) {
let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;

// Ensure that after dropping the Completion, the AIO callback
// will not be called on our dropped waker Box
if self.armed && !am_complete {
// will not be called on our dropped waker Box. Only necessary
// if we got as far as successfully starting an operation using
// the completion.
let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;
if !am_complete {
unsafe {
rados_aio_cancel(self.ioctx.ioctx, self.inner);
rados_aio_wait_for_complete_and_cb(self.inner);
Expand Down Expand Up @@ -122,16 +97,52 @@ impl std::future::Future for Completion {
}
}

pub async fn with_completion<F>(ioctx: Arc<IoCtx>, f: F) -> RadosResult<i32>
fn with_completion_impl<F>(ioctx: Arc<IoCtx>, f: F) -> RadosResult<Completion>
where
F: FnOnce(rados_completion_t) -> libc::c_int,
{
let mut completion = Completion::new(ioctx);
let ret_code = f(completion.inner);
let mut waker = Box::new(Mutex::new(None));

let completion = unsafe {
let mut completion: rados_completion_t = std::ptr::null_mut();
let p: *mut Mutex<Option<Waker>> = &mut *waker;
let p = p as *mut c_void;

let r = rados_aio_create_completion2(p, Some(completion_complete), &mut completion);
if r != 0 {
panic!("Error {} allocating RADOS completion: out of memory?", r);
}

completion
};

let ret_code = f(completion);
if ret_code < 0 {
// On error dispatching I/O, drop the unused rados_completion_t
unsafe {
rados_aio_release(completion);
drop(completion)
}
Err(ret_code.into())
} else {
completion.armed = true;
completion.await
// Pass the rados_completion_t into a Future-implementing wrapper and await it.
Ok(Completion {
ioctx,
inner: completion,
waker,
})
}
}
/// Completions are only created via this wrapper, in order to ensure
/// that the Completion struct is only constructed around 'armed' rados_completion_t
/// instances (i.e. those that have been used to start an I/O).
pub async fn with_completion<F>(ioctx: Arc<IoCtx>, f: F) -> RadosResult<i32>
where
F: FnOnce(rados_completion_t) -> libc::c_int,
{
// Hide c_void* temporaries in a non-async function so that the future generated
// by this function isn't encumbered by their non-Send-ness.
let completion = with_completion_impl(ioctx, f)?;

completion.await
}

0 comments on commit 94a1dd0

Please sign in to comment.