From 11e35a347abb81bc37089df8d42717295af96daa Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 14 Jun 2021 17:44:46 +0100 Subject: [PATCH 1/8] async: add a Completion (impl Future) wrapper around rados_completion_t --- src/ceph.rs | 6 +- src/completion.rs | 161 ++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/rados.rs | 7 ++ 4 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 src/completion.rs diff --git a/src/ceph.rs b/src/ceph.rs index 4001947..92ddcb6 100644 --- a/src/ceph.rs +++ b/src/ceph.rs @@ -39,6 +39,9 @@ use std::io::{BufRead, Cursor}; use std::net::IpAddr; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; use uuid::Uuid; const CEPH_OSD_TMAP_HDR: char = 'h'; @@ -332,7 +335,8 @@ impl Iterator for XAttr { /// Owns a ioctx handle pub struct IoCtx { - ioctx: rados_ioctx_t, + // This is pub within the crate to enable Completions to use it + pub(crate) ioctx: rados_ioctx_t, } unsafe impl Send for IoCtx {} diff --git a/src/completion.rs b/src/completion.rs new file mode 100644 index 0000000..2497bdf --- /dev/null +++ b/src/completion.rs @@ -0,0 +1,161 @@ +// Copyright 2021 John Spray All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ffi::c_void; +use std::pin::Pin; +use std::sync::Mutex; +use std::task::{Context, Poll, Waker}; + +use crate::ceph::IoCtx; +use crate::error::RadosResult; +use crate::rados::{ + rados_aio_cancel, rados_aio_create_completion2, rados_aio_get_return_value, + rados_aio_is_complete, rados_aio_release, rados_aio_wait_for_complete_and_cb, + rados_completion_t, +}; + +pub(crate) struct Completion<'a> { + inner: rados_completion_t, + + // Box to provide a stable address for completion_complete callback + // Mutex for memory fencing when writing from poll() and reading from completion_complete() + waker: Box>>, + + // A reference to the IOCtx is required to issue a cancel on + // the operation if we are dropped before ready. This needs + // to be a Rust reference rather than a raw rados_ioctx_t because otherwise + // there would be nothing to stop the rados_ioctx_t being invalidated + // during the lifetime of this Completion. + // (AioCompletionImpl does hold a reference to IoCtxImpl for writes, but + // not for reads.) + ioctx: &'a IoCtx, +} + +unsafe impl Send for Completion<'_> {} + +#[no_mangle] +pub extern "C" fn completion_complete(_cb: rados_completion_t, arg: *mut c_void) -> () { + let waker = unsafe { + let p = arg as *mut Mutex>; + p.as_mut().unwrap() + }; + + let waker = waker.lock().unwrap().take(); + match waker { + Some(w) => w.wake(), + None => {} + } +} + +impl Drop for Completion<'_> { + fn drop(&mut self) { + // Ensure that after dropping the Completion, the AIO callback + // will not be called on our dropped waker Box. Only necessary + // if we got as far as successfully starting an operation using + // the completion. + let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0; + if !am_complete { + unsafe { + let cancel_r = rados_aio_cancel(self.ioctx.ioctx, self.inner); + + // It is unsound to proceed if the Objecter op is still in flight + assert!(cancel_r == 0 || cancel_r == -libc::ENOENT); + } + } + + unsafe { + // Even if is_complete was true, librados might not be done with + // our callback: wait til it is. + assert_eq!(rados_aio_wait_for_complete_and_cb(self.inner), 0); + } + + unsafe { + rados_aio_release(self.inner); + } + } +} + +impl std::future::Future for Completion<'_> { + type Output = crate::error::RadosResult; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Hold lock across the check of am_complete and subsequent waker registration + // to avoid deadlock if callback is invoked in between. + let mut waker_locked = self.waker.lock().unwrap(); + + let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0; + + if am_complete { + // Unlock Waker so that completion callback can complete if racing with us. + drop(waker_locked); + + // Ensure librados is finished with our callback ('complete' is true + // before it calls that) + unsafe { + let r = rados_aio_wait_for_complete_and_cb(self.inner); + assert_eq!(r, 0); + } + + let r = unsafe { rados_aio_get_return_value(self.inner) }; + let result = if r < 0 { Err(r.into()) } else { Ok(r) }; + std::task::Poll::Ready(result.map(|e| e as u32)) + } else { + // Register a waker + *waker_locked = Some(cx.waker().clone()); + + std::task::Poll::Pending + } + } +} + +/// Completions are only created via this wrapper, in order to ensure +/// that the Completion struct is only constructed around 'armed' rados_completion_t +/// instances (i.e. those that have been used to start an I/O). +pub(crate) fn with_completion(ioctx: &IoCtx, f: F) -> RadosResult> +where + F: FnOnce(rados_completion_t) -> libc::c_int, +{ + let mut waker = Box::new(Mutex::new(None)); + + let completion = unsafe { + let mut completion: rados_completion_t = std::ptr::null_mut(); + let p: *mut Mutex> = &mut *waker; + let p = p as *mut c_void; + + let r = rados_aio_create_completion2(p, Some(completion_complete), &mut completion); + if r != 0 { + panic!("Error {} allocating RADOS completion: out of memory?", r); + } + + completion + }; + + let ret_code = f(completion); + + if ret_code < 0 { + // On error dispatching I/O, drop the unused rados_completion_t + unsafe { + rados_aio_release(completion); + drop(completion) + } + Err(ret_code.into()) + } else { + // Pass the rados_completion_t into a Future-implementing wrapper and await it. + Ok(Completion { + ioctx, + inner: completion, + waker, + }) + } +} diff --git a/src/lib.rs b/src/lib.rs index caccece..fa9be78 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,6 +78,7 @@ pub mod utils; mod ceph_client; mod ceph_version; +pub(crate) mod completion; mod mon_command; pub use crate::ceph_client::CephClient; diff --git a/src/rados.rs b/src/rados.rs index 96e5a61..45a8c27 100644 --- a/src/rados.rs +++ b/src/rados.rs @@ -606,6 +606,13 @@ extern "C" { cb_safe: rados_callback_t, pc: *mut rados_completion_t, ) -> ::libc::c_int; + + pub fn rados_aio_create_completion2( + cb_arg: *mut ::std::os::raw::c_void, + cb_complete: rados_callback_t, + pc: *mut rados_completion_t, + ) -> ::libc::c_int; + pub fn rados_aio_wait_for_complete(c: rados_completion_t) -> ::libc::c_int; pub fn rados_aio_wait_for_safe(c: rados_completion_t) -> ::libc::c_int; pub fn rados_aio_is_complete(c: rados_completion_t) -> ::libc::c_int; From d3a09df391556ba5a21f94920498ca5a84ba536a Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 15 Jun 2021 10:22:49 +0100 Subject: [PATCH 2/8] async: add the basics (write/append/read/remove) --- src/ceph.rs | 120 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 118 insertions(+), 2 deletions(-) diff --git a/src/ceph.rs b/src/ceph.rs index 92ddcb6..3c708ad 100644 --- a/src/ceph.rs +++ b/src/ceph.rs @@ -26,6 +26,7 @@ use nom::number::complete::le_u32; use nom::IResult; use serde_json; +use crate::completion::with_completion; use crate::rados::*; #[cfg(feature = "rados_striper")] use crate::rados_striper::*; @@ -49,6 +50,8 @@ const CEPH_OSD_TMAP_SET: char = 's'; const CEPH_OSD_TMAP_CREATE: char = 'c'; const CEPH_OSD_TMAP_RM: char = 'r'; +const DEFAULT_READ_BYTES: usize = 64 * 1024; + #[derive(Debug, Clone)] pub enum CephHealth { Ok, @@ -938,6 +941,119 @@ impl IoCtx { Ok(()) } + pub async fn rados_async_object_write( + &self, + object_name: &str, + buffer: &[u8], + offset: u64, + ) -> RadosResult { + self.ioctx_guard()?; + let obj_name_str = CString::new(object_name)?; + + with_completion(&self, |c| unsafe { + rados_aio_write( + self.ioctx, + obj_name_str.as_ptr(), + c, + buffer.as_ptr() as *const ::libc::c_char, + buffer.len(), + offset, + ) + })? + .await + } + + /// Async variant of rados_object_append + pub async fn rados_async_object_append( + self: &Arc, + object_name: &str, + buffer: &[u8], + ) -> RadosResult { + self.ioctx_guard()?; + let obj_name_str = CString::new(object_name)?; + + with_completion(self, |c| unsafe { + rados_aio_append( + self.ioctx, + obj_name_str.as_ptr(), + c, + buffer.as_ptr() as *const ::libc::c_char, + buffer.len(), + ) + })? + .await + } + + /// Async variant of rados_object_write_full + pub async fn rados_async_object_write_full( + &self, + object_name: &str, + buffer: &[u8], + ) -> RadosResult { + self.ioctx_guard()?; + let obj_name_str = CString::new(object_name)?; + + with_completion(&self, |c| unsafe { + rados_aio_write_full( + self.ioctx, + obj_name_str.as_ptr(), + c, + buffer.as_ptr() as *const ::libc::c_char, + buffer.len(), + ) + })? + .await + } + + /// Async variant of rados_object_remove + pub async fn rados_async_object_remove(&self, object_name: &str) -> RadosResult<()> { + self.ioctx_guard()?; + let object_name_str = CString::new(object_name)?; + + with_completion(self, |c| unsafe { + rados_aio_remove(self.ioctx, object_name_str.as_ptr() as *const c_char, c) + })? + .await + .map(|_r| ()) + } + + /// Async variant of rados_object_read + pub async fn rados_async_object_read( + self: &Arc, + object_name: &str, + fill_buffer: &mut Vec, + read_offset: u64, + ) -> RadosResult { + self.ioctx_guard()?; + let obj_name_str = CString::new(object_name)?; + + if fill_buffer.capacity() == 0 { + fill_buffer.reserve_exact(DEFAULT_READ_BYTES); + } + + let result = with_completion(self, |c| unsafe { + rados_aio_read( + self.ioctx, + obj_name_str.as_ptr(), + c, + fill_buffer.as_mut_ptr() as *mut c_char, + fill_buffer.capacity(), + read_offset, + ) + })? + .await; + + if let Ok(rval) = &result { + unsafe { + let len = *rval as usize; + assert!(len <= fill_buffer.capacity()); + fill_buffer.set_len(len); + } + } + + result + } + /// Efficiently copy a portion of one object to another /// If the underlying filesystem on the OSD supports it, this will be a /// copy-on-write clone. @@ -995,7 +1111,7 @@ impl IoCtx { /// amount of bytes read /// The io context determines the snapshot to read from, if any was set by /// rados_ioctx_snap_set_read(). - /// Default read size is 64K unless you call Vec::with_capacity(1024*128) + /// Default read size is 64K unless you call Vec::with_capacity /// with a larger size. pub fn rados_object_read( &self, @@ -1007,7 +1123,7 @@ impl IoCtx { let object_name_str = CString::new(object_name)?; let mut len = fill_buffer.capacity(); if len == 0 { - fill_buffer.reserve_exact(1024 * 64); + fill_buffer.reserve_exact(DEFAULT_READ_BYTES); len = fill_buffer.capacity(); } From e5c9b33325b57c78dbb04d881e4635425f122d27 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 16 Jun 2021 12:31:36 +0100 Subject: [PATCH 3/8] async: non-blocking wrapper for connect_to_ceph It would also be possible for callers to do this for themselves with an equivalent background thread wrapper, but it's better to avoid forcing every user to reinvent that. --- Cargo.toml | 1 + src/ceph.rs | 17 +++++++++++++++++ src/completion.rs | 1 + 3 files changed, 19 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 75f9e80..2a86684 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ serde_json = "1" uuid = { version = "~0.8", features = ["serde"] } nix = "0.21" tracing = "0.1" +futures = {version="~0.3", features=["thread-pool"]} [features] default = [] diff --git a/src/ceph.rs b/src/ceph.rs index 3c708ad..5b105fd 100644 --- a/src/ceph.rs +++ b/src/ceph.rs @@ -21,6 +21,7 @@ use crate::error::*; use crate::json::*; use crate::JsonValue; use byteorder::{LittleEndian, WriteBytesExt}; +use futures::task::SpawnExt; use libc::*; use nom::number::complete::le_u32; use nom::IResult; @@ -378,6 +379,7 @@ pub struct Rados { phantom: PhantomData, } +unsafe impl Send for Rados {} unsafe impl Sync for Rados {} impl Drop for Rados { @@ -415,6 +417,21 @@ pub fn connect_to_ceph(user_id: &str, config_file: &str) -> RadosResult { } } +/// Non-blocking wrapper for `connect_to_ceph` +pub async fn connect_to_ceph_async(user_id: &str, config_file: &str) -> RadosResult { + let user_id = user_id.to_string(); + let config_file = config_file.to_string(); + + // librados doesn't have async initialization, so wrap it in a thread pool. + let pool = futures::executor::ThreadPool::builder() + .pool_size(1) + .create() + .expect("Could not spawn thread pool"); + pool.spawn_with_handle(async move { connect_to_ceph(&user_id, &config_file) }) + .expect("Could not spawn background task") + .await +} + impl Rados { pub fn inner(&self) -> &rados_t { &self.rados diff --git a/src/completion.rs b/src/completion.rs index 2497bdf..94bb95e 100644 --- a/src/completion.rs +++ b/src/completion.rs @@ -137,6 +137,7 @@ where if r != 0 { panic!("Error {} allocating RADOS completion: out of memory?", r); } + assert!(!completion.is_null()); completion }; From e5b3b26bd9e4ac71a4dd612b219bef0688bc8ef2 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 18 Jun 2021 11:50:01 +0100 Subject: [PATCH 4/8] async: add wrapper for rados_aio_stat() --- src/ceph.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/ceph.rs b/src/ceph.rs index 5b105fd..a1f981a 100644 --- a/src/ceph.rs +++ b/src/ceph.rs @@ -1071,6 +1071,29 @@ impl IoCtx { result } + /// Get object stats (size,SystemTime) + pub async fn rados_async_object_stat( + &self, + object_name: &str, + ) -> RadosResult<(u64, SystemTime)> { + self.ioctx_guard()?; + let object_name_str = CString::new(object_name)?; + let mut psize: u64 = 0; + let mut time: ::libc::time_t = 0; + + with_completion(self, |c| unsafe { + rados_aio_stat( + self.ioctx, + object_name_str.as_ptr(), + c, + &mut psize, + &mut time, + ) + })? + .await?; + Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64)))) + } + /// Efficiently copy a portion of one object to another /// If the underlying filesystem on the OSD supports it, this will be a /// copy-on-write clone. From 6e4fc2138088ac494bb151bba82d8665955e549f Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Jul 2021 11:36:38 +0100 Subject: [PATCH 5/8] async: add {get/set/rm}xattr wrappers --- src/ceph.rs | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/rados.rs | 26 ++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/src/ceph.rs b/src/ceph.rs index a1f981a..80cce1d 100644 --- a/src/ceph.rs +++ b/src/ceph.rs @@ -1094,6 +1094,75 @@ impl IoCtx { Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64)))) } + /// Async variant of rados_object_getxattr + pub async fn rados_async_object_getxattr( + &self, + object_name: &str, + attr_name: &str, + fill_buffer: &mut [u8], + ) -> RadosResult { + self.ioctx_guard()?; + let object_name_str = CString::new(object_name)?; + let attr_name_str = CString::new(attr_name)?; + + with_completion(self, |c| unsafe { + rados_aio_getxattr( + self.ioctx, + object_name_str.as_ptr() as *const c_char, + c, + attr_name_str.as_ptr() as *const c_char, + fill_buffer.as_mut_ptr() as *mut c_char, + fill_buffer.len(), + ) + })? + .await + } + + /// Async variant of rados_object_setxattr + pub async fn rados_async_object_setxattr( + &self, + object_name: &str, + attr_name: &str, + attr_value: &[u8], + ) -> RadosResult { + self.ioctx_guard()?; + let object_name_str = CString::new(object_name)?; + let attr_name_str = CString::new(attr_name)?; + + with_completion(self, |c| unsafe { + rados_aio_setxattr( + self.ioctx, + object_name_str.as_ptr() as *const c_char, + c, + attr_name_str.as_ptr() as *const c_char, + attr_value.as_ptr() as *mut c_char, + attr_value.len(), + ) + })? + .await + } + + /// Async variant of rados_object_rmxattr + pub async fn rados_async_object_rmxattr( + &self, + object_name: &str, + attr_name: &str, + ) -> RadosResult { + self.ioctx_guard()?; + let object_name_str = CString::new(object_name)?; + let attr_name_str = CString::new(attr_name)?; + + with_completion(self, |c| unsafe { + rados_aio_rmxattr( + self.ioctx, + object_name_str.as_ptr() as *const c_char, + c, + attr_name_str.as_ptr() as *const c_char, + ) + })? + .await + } + /// Efficiently copy a portion of one object to another /// If the underlying filesystem on the OSD supports it, this will be a /// copy-on-write clone. diff --git a/src/rados.rs b/src/rados.rs index 45a8c27..e62cf93 100644 --- a/src/rados.rs +++ b/src/rados.rs @@ -909,6 +909,32 @@ extern "C" { oid: *const ::libc::c_char, flags: ::libc::c_int, ) -> ::libc::c_int; + + pub fn rados_aio_getxattr( + io: rados_ioctx_t, + o: *const ::libc::c_char, + completion: rados_completion_t, + name: *const ::libc::c_char, + buf: *mut ::libc::c_char, + len: size_t, + ) -> ::libc::c_int; + + pub fn rados_aio_setxattr( + io: rados_ioctx_t, + o: *const ::libc::c_char, + completion: rados_completion_t, + name: *const ::libc::c_char, + buf: *const ::libc::c_char, + len: size_t, + ) -> ::libc::c_int; + + pub fn rados_aio_rmxattr( + io: rados_ioctx_t, + o: *const ::libc::c_char, + completion: rados_completion_t, + name: *const ::libc::c_char, + ) -> ::libc::c_int; + pub fn rados_lock_exclusive( io: rados_ioctx_t, o: *const ::libc::c_char, From fb48526557a7d0a071ba2e56d591c74da40b30ed Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Jul 2021 13:55:30 +0100 Subject: [PATCH 6/8] async: add rados_async_object_list --- src/ceph.rs | 20 ++++++-- src/lib.rs | 1 + src/list_stream.rs | 125 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 3 deletions(-) create mode 100644 src/list_stream.rs diff --git a/src/ceph.rs b/src/ceph.rs index 80cce1d..6d76baf 100644 --- a/src/ceph.rs +++ b/src/ceph.rs @@ -41,6 +41,7 @@ use std::io::{BufRead, Cursor}; use std::net::IpAddr; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use crate::list_stream::ListStream; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -1094,13 +1095,26 @@ impl IoCtx { Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64)))) } + pub fn rados_async_object_list(&self) -> RadosResult { + self.ioctx_guard()?; + let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut(); + unsafe { + let r = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx); + if r == 0 { + Ok(ListStream::new(rados_list_ctx)) + } else { + Err(r.into()) + } + } + } + /// Async variant of rados_object_getxattr pub async fn rados_async_object_getxattr( &self, object_name: &str, attr_name: &str, fill_buffer: &mut [u8], - ) -> RadosResult { + ) -> RadosResult { self.ioctx_guard()?; let object_name_str = CString::new(object_name)?; let attr_name_str = CString::new(attr_name)?; @@ -1124,7 +1138,7 @@ impl IoCtx { object_name: &str, attr_name: &str, attr_value: &[u8], - ) -> RadosResult { + ) -> RadosResult { self.ioctx_guard()?; let object_name_str = CString::new(object_name)?; let attr_name_str = CString::new(attr_name)?; @@ -1147,7 +1161,7 @@ impl IoCtx { &self, object_name: &str, attr_name: &str, - ) -> RadosResult { + ) -> RadosResult { self.ioctx_guard()?; let object_name_str = CString::new(object_name)?; let attr_name_str = CString::new(attr_name)?; diff --git a/src/lib.rs b/src/lib.rs index fa9be78..70e9e18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,6 +79,7 @@ pub mod utils; mod ceph_client; mod ceph_version; pub(crate) mod completion; +pub(crate) mod list_stream; mod mon_command; pub use crate::ceph_client::CephClient; diff --git a/src/list_stream.rs b/src/list_stream.rs new file mode 100644 index 0000000..841be8b --- /dev/null +++ b/src/list_stream.rs @@ -0,0 +1,125 @@ +use std::ffi::CStr; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::executor::ThreadPool; +use futures::task::SpawnExt; +use futures::{Future, Stream}; + +use crate::ceph::CephObject; +use crate::error::{RadosError, RadosResult}; +use crate::rados::{rados_list_ctx_t, rados_nobjects_list_close, rados_nobjects_list_next}; + +/// Wrap rados_list_ctx_t to make it Send (hold across .await) +#[derive(Copy, Clone)] +struct ListCtxHandle(rados_list_ctx_t); +unsafe impl Send for ListCtxHandle {} + +/// A high level Stream interface to the librados 'nobjects_list' functionality. +/// +/// librados does not expose asynchronous calls for object listing, so we use +/// a background helper thread. +pub struct ListStream { + ctx: ListCtxHandle, + workers: ThreadPool, + + // We only have a single call to nobjects_list_next outstanding at + // any time: rely on underlying librados/Objecter to do + // batching/readahead + next: Option>>>>>, +} + +unsafe impl Send for ListStream {} + +impl ListStream { + pub fn new(ctx: rados_list_ctx_t) -> Self { + Self { + ctx: ListCtxHandle(ctx), + workers: ThreadPool::builder() + .pool_size(1) + .create() + .expect("Could not spawn worker thread"), + next: None, + } + } +} + +impl Stream for ListStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.next.is_none() { + let list_ctx = self.ctx; + self.next = Some(Box::pin( + self.workers + .spawn_with_handle(async move { + let mut entry_ptr: *mut *const ::libc::c_char = std::ptr::null_mut(); + let mut key_ptr: *mut *const ::libc::c_char = std::ptr::null_mut(); + let mut nspace_ptr: *mut *const ::libc::c_char = std::ptr::null_mut(); + unsafe { + let r = rados_nobjects_list_next( + list_ctx.0, + &mut entry_ptr, + &mut key_ptr, + &mut nspace_ptr, + ); + + if r == -libc::ENOENT { + None + } else if r < 0 { + Some(Err(r.into())) + } else { + let object_name = + CStr::from_ptr(entry_ptr as *const ::libc::c_char); + let mut object_locator = String::new(); + let mut namespace = String::new(); + if !key_ptr.is_null() { + object_locator.push_str( + &CStr::from_ptr(key_ptr as *const ::libc::c_char) + .to_string_lossy(), + ); + } + if !nspace_ptr.is_null() { + namespace.push_str( + &CStr::from_ptr(nspace_ptr as *const ::libc::c_char) + .to_string_lossy(), + ); + } + + Some(Ok(CephObject { + name: object_name.to_string_lossy().into_owned(), + entry_locator: object_locator, + namespace, + })) + } + } + }) + .expect("Could not spawn background task"), + )); + } + + let result = self.next.as_mut().unwrap().as_mut().poll(cx); + match &result { + Poll::Pending => Poll::Pending, + _ => { + self.next = None; + result + } + } + + // match self.next.as_mut().unwrap().as_mut().poll(cx) { + // Poll::Pending => Poll: Pending, + // Poll::Ready(None) => Poll::Ready(None), + // Poll::Ready(Some(Err(rados_error))) => Poll::Ready(Some(Err(rados_error))), + // Poll::Ready(Some(Ok(ceph_object))) => Poll::Ready(Some(Err(rados_error))), + // } + } +} + +impl Drop for ListStream { + fn drop(&mut self) { + unsafe { + rados_nobjects_list_close(self.ctx.0); + } + } +} From 3ad3fcacc10a3491ddf0bcdaaf1485efc421e5ce Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 18 Jun 2021 13:53:56 +0100 Subject: [PATCH 7/8] async: add ReadStream for streaming reads of large objects --- src/ceph.rs | 10 +++ src/lib.rs | 1 + src/read_stream.rs | 183 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 194 insertions(+) create mode 100644 src/read_stream.rs diff --git a/src/ceph.rs b/src/ceph.rs index 6d76baf..1df832c 100644 --- a/src/ceph.rs +++ b/src/ceph.rs @@ -42,6 +42,7 @@ use std::net::IpAddr; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crate::list_stream::ListStream; +use crate::read_stream::ReadStream; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -1072,6 +1073,15 @@ impl IoCtx { result } + /// Streaming read of a RADOS object. The `ReadStream` object implements `futures::Stream` + /// for use with Stream-aware code like hyper's Body::wrap_stream. + /// + /// This will usually issue more read ops than needed if used on a small object: for + /// small objects `rados_async_object_read` is more appropriate. + pub fn rados_async_object_read_stream(&self, object_name: &str) -> ReadStream<'_> { + ReadStream::new(self, object_name, None, None) + } + /// Get object stats (size,SystemTime) pub async fn rados_async_object_stat( &self, diff --git a/src/lib.rs b/src/lib.rs index 70e9e18..855e261 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,7 @@ mod ceph_version; pub(crate) mod completion; pub(crate) mod list_stream; mod mon_command; +pub(crate) mod read_stream; pub use crate::ceph_client::CephClient; pub use crate::ceph_version::CephVersion; diff --git a/src/read_stream.rs b/src/read_stream.rs new file mode 100644 index 0000000..1e9911f --- /dev/null +++ b/src/read_stream.rs @@ -0,0 +1,183 @@ +// Copyright 2021 John Spray All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +use futures::{FutureExt, Stream}; +use std::ffi::CString; +use std::future::Future; +use std::os::raw::c_char; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::ceph::IoCtx; +use crate::completion::with_completion; +use crate::error::RadosResult; +use crate::rados::rados_aio_read; + +const DEFAULT_BUFFER_SIZE: usize = 4 * 1024 * 1024; +const DEFAULT_CONCURRENCY: usize = 2; + +pub struct ReadStream<'a> { + ioctx: &'a IoCtx, + + // Size of each RADOS read op + buffer_size: usize, + + // Number of concurrent RADOS read ops to issue + concurrency: usize, + + in_flight: Vec>, + + next: u64, + + object_name: String, + + // Flag is set when we see a short read - means do not issue any more IOs, + // and return Poll::Ready(None) on next poll + done: bool, +} + +unsafe impl Send for ReadStream<'_> {} + +impl<'a> ReadStream<'a> { + pub(crate) fn new( + ioctx: &'a IoCtx, + object_name: &str, + buffer_size: Option, + concurrency: Option, + ) -> Self { + let mut inst = Self { + ioctx, + buffer_size: buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE), + concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENCY), + in_flight: Vec::new(), + next: 0, + object_name: object_name.to_string(), + done: false, + }; + + // Start IOs early, don't wait for the first poll. + inst.maybe_issue(); + + inst + } +} + +enum IOSlot<'a> { + Pending(Pin, RadosResult)> + 'a>>), + Complete((Vec, RadosResult)), +} + +impl<'a> ReadStream<'a> { + fn maybe_issue(&mut self) { + // Issue reads + while self.in_flight.len() < self.concurrency { + let read_at = self.next; + self.next += self.buffer_size as u64; + + // Inefficient: copying out string to dodge ownership issues for the moment + let object_name_bg = self.object_name.clone(); + + // Grab items for use inside async{} block to avoid referencing self from in there. + let ioctx = self.ioctx; + let read_size = self.buffer_size; + + // Use an async block to tie together the lifetime of a Vec and the Completion that uses it + let fut = async move { + let obj_name_str = CString::new(object_name_bg).expect("CString error"); + let mut fill_buffer = Vec::with_capacity(read_size); + let completion = with_completion(ioctx, |c| unsafe { + rados_aio_read( + ioctx.ioctx, + obj_name_str.as_ptr(), + c, + fill_buffer.as_mut_ptr() as *mut c_char, + fill_buffer.capacity(), + read_at, + ) + }) + .expect("Can't issue read"); + + let result = completion.await; + if let Ok(rval) = &result { + unsafe { + let len = *rval as usize; + assert!(len <= fill_buffer.capacity()); + fill_buffer.set_len(len); + } + } + + (fill_buffer, result) + }; + + let mut fut = Box::pin(fut); + + let slot = match fut.as_mut().now_or_never() { + Some(result) => IOSlot::Complete(result), + None => IOSlot::Pending(fut), + }; + + self.in_flight.push(slot); + } + } +} + +impl<'a> Stream for ReadStream<'a> { + type Item = RadosResult>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.done { + // Our last read result was a short one: we know nothing else needs doing. + return Poll::Ready(None); + } + + self.maybe_issue(); + + // Poll next read: maybe return pending if none is ready + let next_op = &mut self.in_flight[0]; + let (buffer, result) = match next_op { + IOSlot::Complete(_) => { + let complete = self.in_flight.remove(0); + if let IOSlot::Complete(c) = complete { + c + } else { + panic!("Cannot happen") + } + } + IOSlot::Pending(fut) => match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(r) => { + self.in_flight.remove(0); + r + } + }, + }; + + self.maybe_issue(); + + // A result is ready, handle it. + match result { + Ok(length) => { + if (length as usize) < self.buffer_size { + // Cancel outstanding ops + self.in_flight.clear(); + + // Flag to return Ready(None) on next call to poll. + self.done = true; + } + Poll::Ready(Some(Ok(buffer))) + } + Err(e) => Poll::Ready(Some(Err(e))), + } + } +} From c76f843d8d10bd0f580f2b688995773cebe994df Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 28 Jun 2021 00:15:29 +0100 Subject: [PATCH 8/8] async: add rados_async_write_stream -> WriteSink This helps writers who would like to stream data into a RADOS object without managing their own backlog of futures. --- src/ceph.rs | 46 +++++++++++++++-- src/lib.rs | 1 + src/read_stream.rs | 38 +++++++++++--- src/write_sink.rs | 123 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 198 insertions(+), 10 deletions(-) create mode 100644 src/write_sink.rs diff --git a/src/ceph.rs b/src/ceph.rs index 1df832c..afdbfe4 100644 --- a/src/ceph.rs +++ b/src/ceph.rs @@ -43,6 +43,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crate::list_stream::ListStream; use crate::read_stream::ReadStream; +pub use crate::write_sink::WriteSink; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -1076,10 +1077,47 @@ impl IoCtx { /// Streaming read of a RADOS object. The `ReadStream` object implements `futures::Stream` /// for use with Stream-aware code like hyper's Body::wrap_stream. /// - /// This will usually issue more read ops than needed if used on a small object: for - /// small objects `rados_async_object_read` is more appropriate. - pub fn rados_async_object_read_stream(&self, object_name: &str) -> ReadStream<'_> { - ReadStream::new(self, object_name, None, None) + /// Useful for reading large objects incrementally, or anywhere you are using an interface + /// that expects a stream (such as proxying objects via an HTTP server). + /// + /// Efficiency: If size_hint is not specified, and this function is used on a small object, it will + /// issue spurious read-ahead operations beyond the object's size. + /// If you have an object that you know is small, prefer to use a single `rados_async_object_read` + /// instead of this streaming variant. + /// + /// * `buffer_size` - How much data should be read per rados read operation. This is also + /// how much data is emitted in each Item from the stream. + /// * `concurrency` - How many RADOS operations should be run in parallel for this stream, + /// or None to use a default. + /// * `size_hint` - If you have prior knowledge of the object's size in bytes, pass it here to enable + /// the stream to issue fewer read-ahead operations than it would by default. This is just + /// a hint, and does not bound the data returned -- if the object is smaller or larger + /// than `size_hint` then the actual object size will be reflected in the stream's output. + pub fn rados_async_object_read_stream( + &self, + object_name: &str, + buffer_size: Option, + concurrency: Option, + size_hint: Option, + ) -> ReadStream<'_> { + ReadStream::new(self, object_name, buffer_size, concurrency, size_hint) + } + + /// Streaming write of a RADOS object. The `WriteSink` object implements `futures::Sink`. Combine + /// it with other stream-aware code, or bring the SinkExt trait into scope to get methods + /// like send, send_all. + /// + /// Efficiency: this class does not coalesce writes, so each Item you send into it, + /// + /// + /// * `concurrency` - How many RADOS operations should be run in parallel for this stream, + /// or None to use a default. + pub fn rados_async_object_write_stream( + &self, + object_name: &str, + concurrency: Option, + ) -> WriteSink<'_> { + WriteSink::new(self, object_name, concurrency) } /// Get object stats (size,SystemTime) diff --git a/src/lib.rs b/src/lib.rs index 855e261..81b342a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,7 @@ pub(crate) mod completion; pub(crate) mod list_stream; mod mon_command; pub(crate) mod read_stream; +pub(crate) mod write_sink; pub use crate::ceph_client::CephClient; pub use crate::ceph_version::CephVersion; diff --git a/src/read_stream.rs b/src/read_stream.rs index 1e9911f..294eb2d 100644 --- a/src/read_stream.rs +++ b/src/read_stream.rs @@ -36,10 +36,18 @@ pub struct ReadStream<'a> { // Number of concurrent RADOS read ops to issue concurrency: usize, + // Caller's hint as to the object size (not required to be accurate) + size_hint: Option, + in_flight: Vec>, + // Counter for how many bytes we have issued reads for next: u64, + // Counter for how many bytes we have yielded from poll_next() + // (i.e. the size of the object so far) + yielded: u64, + object_name: String, // Flag is set when we see a short read - means do not issue any more IOs, @@ -55,13 +63,16 @@ impl<'a> ReadStream<'a> { object_name: &str, buffer_size: Option, concurrency: Option, + size_hint: Option, ) -> Self { let mut inst = Self { ioctx, buffer_size: buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE), concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENCY), + size_hint, in_flight: Vec::new(), next: 0, + yielded: 0, object_name: object_name.to_string(), done: false, }; @@ -80,8 +91,20 @@ enum IOSlot<'a> { impl<'a> ReadStream<'a> { fn maybe_issue(&mut self) { - // Issue reads - while self.in_flight.len() < self.concurrency { + // Issue reads if any of these are true: + // - Nothing is in flight + // - No size bound, and in flight < concurrency + // - A size bound, and we're within it, and in flight < concurrency + // - A size bound, and it has been disproved, and in flight < concurrency + + while !self.done + && (self.in_flight.is_empty() + || (((self.size_hint.is_some() + && (self.next < self.size_hint.unwrap() + || self.yielded > self.size_hint.unwrap())) + || self.size_hint.is_none()) + && (self.in_flight.len() < self.concurrency))) + { let read_at = self.next; self.next += self.buffer_size as u64; @@ -163,10 +186,8 @@ impl<'a> Stream for ReadStream<'a> { }, }; - self.maybe_issue(); - // A result is ready, handle it. - match result { + let r = match result { Ok(length) => { if (length as usize) < self.buffer_size { // Cancel outstanding ops @@ -175,9 +196,14 @@ impl<'a> Stream for ReadStream<'a> { // Flag to return Ready(None) on next call to poll. self.done = true; } + self.yielded += buffer.len() as u64; Poll::Ready(Some(Ok(buffer))) } Err(e) => Poll::Ready(Some(Err(e))), - } + }; + + self.maybe_issue(); + + r } } diff --git a/src/write_sink.rs b/src/write_sink.rs new file mode 100644 index 0000000..dbd9877 --- /dev/null +++ b/src/write_sink.rs @@ -0,0 +1,123 @@ +use futures::{FutureExt, Sink, Stream}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::ceph::IoCtx; +use crate::completion::with_completion; +use crate::error::{RadosError, RadosResult}; +use crate::rados::rados_aio_write; +use futures::stream::FuturesUnordered; +use std::ffi::CString; +use std::os::raw::c_char; + +const DEFAULT_CONCURRENCY: usize = 2; + +pub struct WriteSink<'a> { + ioctx: &'a IoCtx, + in_flight: Pin> + 'a>>>>>, + object_name: String, + + // Offset into object where the next write will land + next: u64, + + // How many RADOS ops in flight at same time? + concurrency: usize, +} + +unsafe impl Send for WriteSink<'_> {} + +impl<'a> WriteSink<'a> { + pub fn new(ioctx: &'a IoCtx, object_name: &str, concurrency: Option) -> Self { + let concurrency = concurrency.unwrap_or(DEFAULT_CONCURRENCY); + assert!(concurrency > 0); + + Self { + ioctx, + in_flight: Box::pin(FuturesUnordered::new()), + object_name: object_name.to_string(), + next: 0, + concurrency, + } + } + + fn trim_in_flight( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + target_len: usize, + ) -> Poll>>::Error>> { + while self.in_flight.len() > target_len { + match self.in_flight.as_mut().poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + // (because we check for in_flight size first) + unreachable!() + } + Poll::Ready(Some(result)) => match result { + Err(e) => return Poll::Ready(Err(e)), + Ok(sz) => { + debug!("trim_in_flight: IO completed with r={}", sz); + } + }, + }; + } + + // Nothing left in flight, we're done + Poll::Ready(Ok(())) + } +} + +impl<'a> Sink> for WriteSink<'a> { + type Error = RadosError; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // If we have fewer than 1 slots available, this will try to wait on some outstanding futures + let target = self.as_ref().concurrency - 1; + if self.in_flight.len() > target { + self.trim_in_flight(cx, target) + } else { + Poll::Ready(Ok(())) + } + } + + fn start_send(mut self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { + let ioctx = self.ioctx; + let obj_name_str = CString::new(self.object_name.clone()).expect("CString error"); + let write_at = self.next; + self.next += item.len() as u64; + + let mut fut = Box::pin(async move { + let c = with_completion(ioctx, |c| unsafe { + rados_aio_write( + ioctx.ioctx, + obj_name_str.as_ptr(), + c, + item.as_ptr() as *mut c_char, + item.len(), + write_at, + ) + })?; + + c.await + }); + + // Kick the async{} future to get the RADOS op sent + match fut.as_mut().now_or_never() { + Some(Ok(_)) => Ok(()), + Some(Err(e)) => return Err(e), + None => { + self.in_flight.push(fut); + Ok(()) + } + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.trim_in_flight(cx, 0) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // There is no special work to be done on close + self.poll_flush(cx) + } +}