Skip to content

Commit

Permalink
Make s3 client able to report read window offset (#971)
Browse files Browse the repository at this point in the history
* Make s3 client able to report read window offset

Signed-off-by: Monthon Klongklaew <monthonk@amazon.com>

* Update CHANGELOG.md

Signed-off-by: Monthon Klongklaew <monthonk@amazon.com>

* PR comments

Signed-off-by: Monthon Klongklaew <monthonk@amazon.com>

* PR comments

Signed-off-by: Monthon Klongklaew <monthonk@amazon.com>

---------

Signed-off-by: Monthon Klongklaew <monthonk@amazon.com>
  • Loading branch information
monthonk authored Aug 7, 2024
1 parent 6e9eaa1 commit 6c6b1e3
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 104 deletions.
10 changes: 10 additions & 0 deletions mountpoint-s3-client/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## Unreleased

### Breaking changes

* When using GetObject with backpressure enabled, an error will be returned when there is not enough read window instead of blocking. ([#971](https://github.com/awslabs/mountpoint-s3/pull/971))

### Other changes

* Allow querying initial read window size and read window end offset for backpressure GetObject. ([#971](https://github.com/awslabs/mountpoint-s3/pull/971))

## v0.9.0 (June 26, 2024)

* Adds support for `AWS_ENDPOINT_URL` environment variable. ([#895](https://github.com/awslabs/mountpoint-s3/pull/895))
Expand Down
9 changes: 9 additions & 0 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ where
self.client.write_part_size()
}

fn initial_read_window_size(&self) -> Option<usize> {
self.client.initial_read_window_size()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down Expand Up @@ -188,6 +192,11 @@ impl<Client: ObjectClient, FailState: Send> GetObjectRequest for FailureGetReque
let this = self.project();
this.request.increment_read_window(len);
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
let this = self.project_ref();
this.request.read_window_end_offset()
}
}

impl<Client: ObjectClient, FailState> Stream for FailureGetRequest<Client, FailState> {
Expand Down
128 changes: 64 additions & 64 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub struct MockClientConfig {
/// A seed to randomize the order of ListObjectsV2 results, or None to use ordered list
pub unordered_list_seed: Option<u64>,
/// A flag to enable backpressure read
pub enable_back_pressure: bool,
pub enable_backpressure: bool,
/// Initial backpressure read window size, ignored if enable_back_pressure is false
pub initial_read_window_size: usize,
}
Expand Down Expand Up @@ -475,8 +475,8 @@ pub struct MockGetObjectRequest {
next_offset: u64,
length: usize,
part_size: usize,
enable_back_pressure: bool,
current_window_size: usize,
enable_backpressure: bool,
read_window_end_offset: u64,
}

impl MockGetObjectRequest {
Expand All @@ -498,7 +498,11 @@ impl GetObjectRequest for MockGetObjectRequest {
type ClientError = MockClientError;

fn increment_read_window(mut self: Pin<&mut Self>, len: usize) {
self.current_window_size += len;
self.read_window_end_offset += len as u64;
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
self.read_window_end_offset
}
}

Expand All @@ -510,15 +514,13 @@ impl Stream for MockGetObjectRequest {
return Poll::Ready(None);
}

let mut next_read_size = self.part_size.min(self.length);
let next_read_size = self.part_size.min(self.length);

// Simulate backpressure mechanism
if self.enable_back_pressure {
if self.current_window_size == 0 {
return Poll::Pending;
}
next_read_size = self.current_window_size.min(next_read_size);
self.current_window_size -= next_read_size;
if self.enable_backpressure && self.next_offset >= self.read_window_end_offset {
return Poll::Ready(Some(Err(ObjectClientError::ClientError(MockClientError(
"empty read window".into(),
)))));
}
let next_part = self.object.read(self.next_offset, next_read_size);

Expand Down Expand Up @@ -562,6 +564,14 @@ impl ObjectClient for MockClient {
Some(self.config.part_size)
}

fn initial_read_window_size(&self) -> Option<usize> {
if self.config.enable_backpressure {
Some(self.config.initial_read_window_size)
} else {
None
}
}

async fn delete_object(
&self,
bucket: &str,
Expand Down Expand Up @@ -616,8 +626,8 @@ impl ObjectClient for MockClient {
next_offset,
length,
part_size: self.config.part_size,
enable_back_pressure: self.config.enable_back_pressure,
current_window_size: self.config.initial_read_window_size,
enable_backpressure: self.config.enable_backpressure,
read_window_end_offset: next_offset + self.config.initial_read_window_size as u64,
})
} else {
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey))
Expand Down Expand Up @@ -908,18 +918,25 @@ enum MockObjectParts {

#[cfg(test)]
mod tests {
use std::{
sync::mpsc::{self, RecvTimeoutError},
thread,
};

use futures::{pin_mut, StreamExt};
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use test_case::test_case;

use super::*;

macro_rules! assert_client_error {
($e:expr, $err:expr) => {
let err = $e.expect_err("should fail");
match err {
ObjectClientError::ClientError(MockClientError(m)) => {
assert_eq!(&*m, $err);
}
_ => assert!(false, "wrong error type"),
}
};
}

async fn test_get_object(key: &str, size: usize, range: Option<Range<u64>>) {
let mut rng = ChaChaRng::seed_from_u64(0x12345678);

Expand Down Expand Up @@ -971,7 +988,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: true,
enable_backpressure: true,
initial_read_window_size: backpressure_read_window_size,
});

Expand All @@ -992,9 +1009,12 @@ mod tests {
assert_eq!(offset, next_offset, "wrong body part offset");
next_offset += body.len() as u64;
accum.extend_from_slice(&body[..]);
get_request
.as_mut()
.increment_read_window(backpressure_read_window_size);

while next_offset >= get_request.as_ref().read_window_end_offset() {
get_request
.as_mut()
.increment_read_window(backpressure_read_window_size);
}
}
let expected_range = range.unwrap_or(0..size as u64);
let expected_range = expected_range.start as usize..expected_range.end as usize;
Expand Down Expand Up @@ -1024,18 +1044,6 @@ mod tests {
rng.fill_bytes(&mut body);
client.add_object("key1", body[..].into());

macro_rules! assert_client_error {
($e:expr, $err:expr) => {
let err = $e.expect_err("should fail");
match err {
ObjectClientError::ClientError(MockClientError(m)) => {
assert_eq!(&*m, $err);
}
_ => assert!(false, "wrong error type"),
}
};
}

assert!(matches!(
client.get_object("wrong_bucket", "key1", None, None).await,
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchBucket))
Expand Down Expand Up @@ -1068,53 +1076,45 @@ mod tests {
);
}

// Verify that the request is blocked when we don't increment read window size
// Verify that an error is returned when we don't increment read window size
#[tokio::test]
async fn verify_backpressure_get_object() {
let key = "key1";
let size = 1000;
let range = 50..1000;
let mut rng = ChaChaRng::seed_from_u64(0x12345678);

let mut rng = ChaChaRng::seed_from_u64(0x12345678);
let client = MockClient::new(MockClientConfig {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: true,
enable_backpressure: true,
initial_read_window_size: 256,
});

let mut body = vec![0u8; size];
rng.fill_bytes(&mut body);
client.add_object(key, MockObject::from_bytes(&body, ETag::for_tests()));
let part_size = client.read_part_size().unwrap();
let size = part_size * 2;
let range = 0..(part_size + 1) as u64;

let mut expected_body = vec![0u8; size];
rng.fill_bytes(&mut expected_body);
client.add_object(key, MockObject::from_bytes(&expected_body, ETag::for_tests()));

let mut get_request = client
.get_object("test_bucket", key, Some(range.clone()), None)
.await
.expect("should not fail");

let mut accum = vec![];
let mut next_offset = range.start;

let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
futures::executor::block_on(async move {
while let Some(r) = get_request.next().await {
let (offset, body) = r.unwrap();
assert_eq!(offset, next_offset, "wrong body part offset");
next_offset += body.len() as u64;
accum.extend_from_slice(&body[..]);
}
let expected_range = range;
let expected_range = expected_range.start as usize..expected_range.end as usize;
assert_eq!(&accum[..], &body[expected_range], "body does not match");
sender.send(accum).unwrap();
})
});
match receiver.recv_timeout(Duration::from_millis(100)) {
Ok(_) => panic!("request should have been blocked"),
Err(e) => assert_eq!(e, RecvTimeoutError::Timeout),
}
// Verify that we can receive some data since the window size is more than 0
let first_part = get_request.next().await.expect("result should not be empty");
let (offset, body) = first_part.unwrap();
assert_eq!(offset, 0, "wrong body part offset");

// The CRT always return at least a part even if the window is smaller than that
let expected_range = range.start as usize..part_size;
assert_eq!(&body[..], &expected_body[expected_range]);

// This await should return an error because current window is not enough to get the next part
let next = get_request.next().await.expect("result should not be empty");
assert_client_error!(next, "empty read window");
}

#[tokio::test]
Expand Down
9 changes: 9 additions & 0 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl GetObjectRequest for ThroughputGetObjectRequest {
let this = self.project();
this.request.increment_read_window(len);
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
let this = self.project_ref();
this.request.read_window_end_offset()
}
}

impl Stream for ThroughputGetObjectRequest {
Expand Down Expand Up @@ -105,6 +110,10 @@ impl ObjectClient for ThroughputMockClient {
self.inner.write_part_size()
}

fn initial_read_window_size(&self) -> Option<usize> {
self.inner.initial_read_window_size()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
8 changes: 8 additions & 0 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ pub trait ObjectClient {
/// can be `None` if the client does not do multi-part operations.
fn write_part_size(&self) -> Option<usize>;

/// Query the initial read window size this client uses for backpressure GetObject requests.
/// This can be `None` if backpressure is disabled.
fn initial_read_window_size(&self) -> Option<usize>;

/// Delete a single object from the object store.
///
/// DeleteObject will succeed even if the object within the bucket does not exist.
Expand Down Expand Up @@ -378,6 +382,10 @@ pub trait GetObjectRequest:
/// If `enable_read_backpressure` is false this call will have no effect,
/// no backpressure is being applied and data is being downloaded as fast as possible.
fn increment_read_window(self: Pin<&mut Self>, len: usize);

/// Get the upper bound of the current read window. When backpressure is enabled, [GetObjectRequest] can
/// return data up to this offset *exclusively*.
fn read_window_end_offset(self: Pin<&Self>) -> u64;
}

/// A streaming put request which allows callers to asynchronously write the body of the request.
Expand Down
22 changes: 19 additions & 3 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ use pin_project::{pin_project, pinned_drop};
use thiserror::Error;
use tracing::{debug, error, trace, Span};

use self::get_object::S3GetObjectRequest;
use self::put_object::S3PutObjectRequest;
use crate::endpoint_config::EndpointError;
use crate::endpoint_config::{self, EndpointConfig};
use crate::object_client::*;
use crate::user_agent::UserAgent;
use crate::{object_client::*, S3GetObjectRequest, S3PutObjectRequest};

macro_rules! request_span {
($self:expr, $method:expr, $($field:tt)*) => {{
Expand Down Expand Up @@ -267,6 +265,8 @@ struct S3CrtClientInner {
request_payer: Option<String>,
read_part_size: usize,
write_part_size: usize,
enable_backpressure: bool,
initial_read_window_size: usize,
bucket_owner: Option<String>,
credentials_provider: Option<CredentialsProvider>,
host_resolver: HostResolver,
Expand Down Expand Up @@ -395,6 +395,8 @@ impl S3CrtClientInner {
request_payer: config.request_payer,
read_part_size: config.read_part_size,
write_part_size: config.write_part_size,
enable_backpressure: config.read_backpressure,
initial_read_window_size: config.initial_read_window,
bucket_owner: config.bucket_owner,
credentials_provider: Some(credentials_provider),
host_resolver,
Expand Down Expand Up @@ -974,6 +976,12 @@ pub enum S3RequestError {
/// The request was throttled by S3
#[error("Request throttled")]
Throttled,

/// Cannot fetch more data because current read window is exhausted. The read window must
/// be advanced using [GetObjectRequest::increment_read_window(u64)] to continue fetching
/// new data.
#[error("Polled for data with empty read window")]
EmptyReadWindow,
}

impl S3RequestError {
Expand Down Expand Up @@ -1178,6 +1186,14 @@ impl ObjectClient for S3CrtClient {
Some(self.inner.write_part_size)
}

fn initial_read_window_size(&self) -> Option<usize> {
if self.inner.enable_backpressure {
Some(self.inner.initial_read_window_size)
} else {
None
}
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
Loading

1 comment on commit 6c6b1e3

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 6c6b1e3 Previous: 6e9eaa1 Ratio
sequential_read_direct_io_small_file 769.60703125 MiB/s 1539.99833984375 MiB/s 2.00

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.