Skip to content

Commit

Permalink
Fix a couple of issues in Completion
Browse files Browse the repository at this point in the history
- librados calls callbacks /after/ setting complete=true,
  so we have to explicitly wait for callbacks even once
  the completion is complete.
- Could deadlock if completion_callback was called between
  poll()'s initial completeness check and it setting a Waker
  • Loading branch information
jcsp committed Jun 18, 2021
1 parent 6e5875a commit 37eb285
Showing 1 changed file with 24 additions and 4 deletions.
28 changes: 24 additions & 4 deletions src/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,15 @@ impl Drop for Completion<'_> {

// It is unsound to proceed if the Objecter op is still in flight
assert!(cancel_r == 0 || cancel_r == -libc::ENOENT);

rados_aio_wait_for_complete_and_cb(self.inner);
}
}

unsafe {
// Even if is_complete was true, librados might not be done with
// our callback: wait til it is.
assert_eq!(rados_aio_wait_for_complete_and_cb(self.inner), 0);
}

unsafe {
rados_aio_release(self.inner);
}
Expand All @@ -85,16 +89,31 @@ impl Drop for Completion<'_> {
impl std::future::Future for Completion<'_> {
type Output = crate::error::RadosResult<i32>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Hold lock across the check of am_complete and subsequent waker registration
// to avoid deadlock if callback is invoked in between.
let mut waker_locked = self.waker.lock().unwrap();

let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;

if am_complete {
// Unlock Waker so that completion callback can complete if racing with us.
drop(waker_locked);

// Ensure librados is finished with our callback ('complete' is true
// before it calls that)
unsafe {
let r = rados_aio_wait_for_complete_and_cb(self.inner);
assert_eq!(r, 0);
}

let r = unsafe { rados_aio_get_return_value(self.inner) };
let result = if r < 0 { Err(r.into()) } else { Ok(r) };

std::task::Poll::Ready(result)
} else {
// Register a waker
*self.as_mut().waker.lock().unwrap() = Some(cx.waker().clone());
*waker_locked = Some(cx.waker().clone());

std::task::Poll::Pending
}
Expand Down Expand Up @@ -125,6 +144,7 @@ where
};

let ret_code = f(completion);

if ret_code < 0 {
// On error dispatching I/O, drop the unused rados_completion_t
unsafe {
Expand Down

0 comments on commit 37eb285

Please sign in to comment.