Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async/await compatible wrapper for librados AIO methods #79

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ serde_json = "1"
uuid = { version = "~0.8", features = ["serde"] }
nix = "0.21"
tracing = "0.1"
futures = {version="~0.3", features=["thread-pool"]}

[features]
default = []
Expand Down
297 changes: 294 additions & 3 deletions src/ceph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use crate::error::*;
use crate::json::*;
use crate::JsonValue;
use byteorder::{LittleEndian, WriteBytesExt};
use futures::task::SpawnExt;
use libc::*;
use nom::number::complete::le_u32;
use nom::IResult;
use serde_json;

use crate::completion::with_completion;
use crate::rados::*;
#[cfg(feature = "rados_striper")]
use crate::rados_striper::*;
Expand All @@ -39,13 +41,21 @@ use std::io::{BufRead, Cursor};
use std::net::IpAddr;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use crate::list_stream::ListStream;
use crate::read_stream::ReadStream;
pub use crate::write_sink::WriteSink;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use uuid::Uuid;

const CEPH_OSD_TMAP_HDR: char = 'h';
const CEPH_OSD_TMAP_SET: char = 's';
const CEPH_OSD_TMAP_CREATE: char = 'c';
const CEPH_OSD_TMAP_RM: char = 'r';

const DEFAULT_READ_BYTES: usize = 64 * 1024;

#[derive(Debug, Clone)]
pub enum CephHealth {
Ok,
Expand Down Expand Up @@ -332,7 +342,8 @@ impl Iterator for XAttr {

/// Owns a ioctx handle
pub struct IoCtx {
ioctx: rados_ioctx_t,
// This is pub within the crate to enable Completions to use it
pub(crate) ioctx: rados_ioctx_t,
}

unsafe impl Send for IoCtx {}
Expand Down Expand Up @@ -371,6 +382,7 @@ pub struct Rados {
phantom: PhantomData<IoCtx>,
}

unsafe impl Send for Rados {}
unsafe impl Sync for Rados {}

impl Drop for Rados {
Expand Down Expand Up @@ -408,6 +420,21 @@ pub fn connect_to_ceph(user_id: &str, config_file: &str) -> RadosResult<Rados> {
}
}

/// Non-blocking wrapper for `connect_to_ceph`
pub async fn connect_to_ceph_async(user_id: &str, config_file: &str) -> RadosResult<Rados> {
let user_id = user_id.to_string();
let config_file = config_file.to_string();

// librados doesn't have async initialization, so wrap it in a thread pool.
let pool = futures::executor::ThreadPool::builder()
.pool_size(1)
.create()
.expect("Could not spawn thread pool");
pool.spawn_with_handle(async move { connect_to_ceph(&user_id, &config_file) })
.expect("Could not spawn background task")
.await
}

impl Rados {
pub fn inner(&self) -> &rados_t {
&self.rados
Expand Down Expand Up @@ -934,6 +961,270 @@ impl IoCtx {
Ok(())
}

pub async fn rados_async_object_write(
&self,
object_name: &str,
buffer: &[u8],
offset: u64,
) -> RadosResult<u32> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;

with_completion(&self, |c| unsafe {
rados_aio_write(
self.ioctx,
obj_name_str.as_ptr(),
c,
buffer.as_ptr() as *const ::libc::c_char,
buffer.len(),
offset,
)
})?
.await
}

/// Async variant of rados_object_append
pub async fn rados_async_object_append(
self: &Arc<Self>,
object_name: &str,
buffer: &[u8],
) -> RadosResult<u32> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;

with_completion(self, |c| unsafe {
rados_aio_append(
self.ioctx,
obj_name_str.as_ptr(),
c,
buffer.as_ptr() as *const ::libc::c_char,
buffer.len(),
)
})?
.await
}

/// Async variant of rados_object_write_full
pub async fn rados_async_object_write_full(
&self,
object_name: &str,
buffer: &[u8],
) -> RadosResult<u32> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;

with_completion(&self, |c| unsafe {
rados_aio_write_full(
self.ioctx,
obj_name_str.as_ptr(),
c,
buffer.as_ptr() as *const ::libc::c_char,
buffer.len(),
)
})?
.await
}

/// Async variant of rados_object_remove
pub async fn rados_async_object_remove(&self, object_name: &str) -> RadosResult<()> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;

with_completion(self, |c| unsafe {
rados_aio_remove(self.ioctx, object_name_str.as_ptr() as *const c_char, c)
})?
.await
.map(|_r| ())
}

/// Async variant of rados_object_read
pub async fn rados_async_object_read(
self: &Arc<Self>,
object_name: &str,
fill_buffer: &mut Vec<u8>,
read_offset: u64,
) -> RadosResult<u32> {
self.ioctx_guard()?;
let obj_name_str = CString::new(object_name)?;

if fill_buffer.capacity() == 0 {
fill_buffer.reserve_exact(DEFAULT_READ_BYTES);
}

let result = with_completion(self, |c| unsafe {
rados_aio_read(
self.ioctx,
obj_name_str.as_ptr(),
c,
fill_buffer.as_mut_ptr() as *mut c_char,
fill_buffer.capacity(),
read_offset,
)
})?
.await;

if let Ok(rval) = &result {
unsafe {
let len = *rval as usize;
assert!(len <= fill_buffer.capacity());
fill_buffer.set_len(len);
}
}

result
}

/// Streaming read of a RADOS object. The `ReadStream` object implements `futures::Stream`
/// for use with Stream-aware code like hyper's Body::wrap_stream.
///
/// Useful for reading large objects incrementally, or anywhere you are using an interface
/// that expects a stream (such as proxying objects via an HTTP server).
///
/// Efficiency: If size_hint is not specified, and this function is used on a small object, it will
/// issue spurious read-ahead operations beyond the object's size.
/// If you have an object that you know is small, prefer to use a single `rados_async_object_read`
/// instead of this streaming variant.
///
/// * `buffer_size` - How much data should be read per rados read operation. This is also
/// how much data is emitted in each Item from the stream.
/// * `concurrency` - How many RADOS operations should be run in parallel for this stream,
/// or None to use a default.
/// * `size_hint` - If you have prior knowledge of the object's size in bytes, pass it here to enable
/// the stream to issue fewer read-ahead operations than it would by default. This is just
/// a hint, and does not bound the data returned -- if the object is smaller or larger
/// than `size_hint` then the actual object size will be reflected in the stream's output.
pub fn rados_async_object_read_stream(
&self,
object_name: &str,
buffer_size: Option<usize>,
concurrency: Option<usize>,
size_hint: Option<u64>,
) -> ReadStream<'_> {
ReadStream::new(self, object_name, buffer_size, concurrency, size_hint)
}

/// Streaming write of a RADOS object. The `WriteSink` object implements `futures::Sink`. Combine
/// it with other stream-aware code, or bring the SinkExt trait into scope to get methods
/// like send, send_all.
///
/// Efficiency: this class does not coalesce writes, so each Item you send into it,
///
///
/// * `concurrency` - How many RADOS operations should be run in parallel for this stream,
/// or None to use a default.
pub fn rados_async_object_write_stream(
&self,
object_name: &str,
concurrency: Option<usize>,
) -> WriteSink<'_> {
WriteSink::new(self, object_name, concurrency)
}

/// Get object stats (size,SystemTime)
pub async fn rados_async_object_stat(
&self,
object_name: &str,
) -> RadosResult<(u64, SystemTime)> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let mut psize: u64 = 0;
let mut time: ::libc::time_t = 0;

with_completion(self, |c| unsafe {
rados_aio_stat(
self.ioctx,
object_name_str.as_ptr(),
c,
&mut psize,
&mut time,
)
})?
.await?;
Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
}

pub fn rados_async_object_list(&self) -> RadosResult<ListStream> {
self.ioctx_guard()?;
let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut();
unsafe {
let r = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx);
if r == 0 {
Ok(ListStream::new(rados_list_ctx))
} else {
Err(r.into())
}
}
}

/// Async variant of rados_object_getxattr
pub async fn rados_async_object_getxattr(
&self,
object_name: &str,
attr_name: &str,
fill_buffer: &mut [u8],
) -> RadosResult<u32> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;

with_completion(self, |c| unsafe {
rados_aio_getxattr(
self.ioctx,
object_name_str.as_ptr() as *const c_char,
c,
attr_name_str.as_ptr() as *const c_char,
fill_buffer.as_mut_ptr() as *mut c_char,
fill_buffer.len(),
)
})?
.await
}

/// Async variant of rados_object_setxattr
pub async fn rados_async_object_setxattr(
&self,
object_name: &str,
attr_name: &str,
attr_value: &[u8],
) -> RadosResult<u32> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;

with_completion(self, |c| unsafe {
rados_aio_setxattr(
self.ioctx,
object_name_str.as_ptr() as *const c_char,
c,
attr_name_str.as_ptr() as *const c_char,
attr_value.as_ptr() as *mut c_char,
attr_value.len(),
)
})?
.await
}

/// Async variant of rados_object_rmxattr
pub async fn rados_async_object_rmxattr(
&self,
object_name: &str,
attr_name: &str,
) -> RadosResult<u32> {
self.ioctx_guard()?;
let object_name_str = CString::new(object_name)?;
let attr_name_str = CString::new(attr_name)?;

with_completion(self, |c| unsafe {
rados_aio_rmxattr(
self.ioctx,
object_name_str.as_ptr() as *const c_char,
c,
attr_name_str.as_ptr() as *const c_char,
)
})?
.await
}

/// Efficiently copy a portion of one object to another
/// If the underlying filesystem on the OSD supports it, this will be a
/// copy-on-write clone.
Expand Down Expand Up @@ -991,7 +1282,7 @@ impl IoCtx {
/// amount of bytes read
/// The io context determines the snapshot to read from, if any was set by
/// rados_ioctx_snap_set_read().
/// Default read size is 64K unless you call Vec::with_capacity(1024*128)
/// Default read size is 64K unless you call Vec::with_capacity
/// with a larger size.
pub fn rados_object_read(
&self,
Expand All @@ -1003,7 +1294,7 @@ impl IoCtx {
let object_name_str = CString::new(object_name)?;
let mut len = fill_buffer.capacity();
if len == 0 {
fill_buffer.reserve_exact(1024 * 64);
fill_buffer.reserve_exact(DEFAULT_READ_BYTES);
len = fill_buffer.capacity();
}

Expand Down
Loading