Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failure hook for put_object_single #1077

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 112 additions & 44 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use crate::object_client::{
};

// Wrapper for injecting failures into a get stream or a put request
pub struct FailureRequestWrapper<Client: ObjectClient, RequestWrapperState> {
pub struct FailureRequestWrapper<ClientError, RequestWrapperState> {
state: RequestWrapperState,
result_fn: fn(&mut RequestWrapperState) -> Result<(), Client::ClientError>,
result_fn: fn(&mut RequestWrapperState) -> Result<(), ClientError>,
}

#[allow(clippy::type_complexity)]
Expand All @@ -39,7 +39,7 @@ pub struct FailureClient<Client: ObjectClient, State, RequestWrapperState> {
Option<Range<u64>>,
Option<ETag>,
) -> Result<
FailureRequestWrapper<Client, RequestWrapperState>,
FailureRequestWrapper<Client::ClientError, RequestWrapperState>,
ObjectClientError<GetObjectError, Client::ClientError>,
>,
pub head_object_cb:
Expand All @@ -52,13 +52,20 @@ pub struct FailureClient<Client: ObjectClient, State, RequestWrapperState> {
usize,
&str,
) -> Result<(), ObjectClientError<ListObjectsError, Client::ClientError>>,
pub put_object_single_cb: fn(
&mut State,
&str,
&str,
params: &PutObjectSingleParams,
&[u8],
) -> Result<(), ObjectClientError<PutObjectError, Client::ClientError>>,
pub put_object_cb: fn(
&mut State,
&str,
&str,
&PutObjectParams,
) -> Result<
FailureRequestWrapper<Client, RequestWrapperState>,
FailureRequestWrapper<Client::ClientError, RequestWrapperState>,
ObjectClientError<PutObjectError, Client::ClientError>,
>,
}
Expand Down Expand Up @@ -187,7 +194,7 @@ where
params: &PutObjectSingleParams,
contents: impl AsRef<[u8]> + Send + 'a,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
// TODO failure hook for put_object_single
(self.put_object_single_cb)(&mut *self.state.lock().unwrap(), bucket, key, params, contents.as_ref())?;
self.client.put_object_single(bucket, key, params, contents).await
}

Expand Down Expand Up @@ -272,73 +279,86 @@ where
}

/// A failure client that fails operations based on counts.
pub type CountdownFailureClient<Client> =
FailureClient<Client, CountdownFailureClientState<Client>, CountdownFailureRequestState<Client>>;

pub type RequestFailureMap<Client, RequestError> = HashMap<
usize,
Result<
(usize, <Client as ObjectClient>::ClientError),
ObjectClientError<RequestError, <Client as ObjectClient>::ClientError>,
>,
pub type CountdownFailureClient<Client> = FailureClient<
Client,
CountdownFailureClientState<<Client as ObjectClient>::ClientError>,
CountdownFailureRequestState<<Client as ObjectClient>::ClientError>,
>;

pub type RequestFailureMap<ClientError, RequestError> =
HashMap<usize, Result<(usize, ClientError), ObjectClientError<RequestError, ClientError>>>;

/// Configuration for a [CountdownFailureClient].
#[derive(Default)]
pub struct CountdownFailureConfig<ClientError> {
/// For GET, map entries are interpreted as follows (operations are numbered starting at 1):
/// (k -> Err(E) means return error E on the k'th GET
/// (k -> Ok((n, E))) means return a stream object on the k'th get that
/// returns error E on the n'th read request from that stream, otherwise reads from the underlying stream
/// (Note: we could also define a failure client that tracks offsets, and returns an error when the offset
/// reaches a specified threshold.)
pub get_failures: RequestFailureMap<ClientError, GetObjectError>,
/// For HEAD, map entries are interpreted as follows:
/// (k -> E) means inject error E on the k'th call to that operation
pub head_failures: HashMap<usize, ObjectClientError<HeadObjectError, ClientError>>,
/// For LIST, map entries are interpreted as follows:
/// (k -> E) means inject error E on the k'th call to that operation
pub list_failures: HashMap<usize, ObjectClientError<ListObjectsError, ClientError>>,
/// For single PUT, map entries are interpreted as follows:
/// (k -> E) means inject error E on the k'th call to that operation
pub put_single_failures: HashMap<usize, ObjectClientError<PutObjectError, ClientError>>,
/// For PUT, map entries are interpreted as follows (operations are numbered starting at 1):
/// (k -> Err(E) means return error E on the k'th PUT
/// (k -> Ok((n, E))) means return a put request object on the k'th put that
/// returns error E on the n'th write, otherwise writes to the underlying request.
pub put_failures: RequestFailureMap<ClientError, PutObjectError>,
}

#[allow(clippy::type_complexity)]
#[derive(Default)]
pub struct CountdownFailureClientState<Client: ObjectClient> {
pub struct CountdownFailureClientState<ClientError> {
get_count: usize,
get_results: RequestFailureMap<Client, GetObjectError>,
get_failures: RequestFailureMap<ClientError, GetObjectError>,
head_count: usize,
head_failures: HashMap<usize, ObjectClientError<HeadObjectError, Client::ClientError>>,
head_failures: HashMap<usize, ObjectClientError<HeadObjectError, ClientError>>,
list_count: usize,
list_failures: HashMap<usize, ObjectClientError<ListObjectsError, Client::ClientError>>,
list_failures: HashMap<usize, ObjectClientError<ListObjectsError, ClientError>>,
put_single_count: usize,
put_single_failures: HashMap<usize, ObjectClientError<PutObjectError, ClientError>>,
put_count: usize,
put_results: RequestFailureMap<Client, PutObjectError>,
put_results: RequestFailureMap<ClientError, PutObjectError>,
}

#[derive(Debug, Default)]
pub struct CountdownFailureRequestState<Client: ObjectClient> {
pub struct CountdownFailureRequestState<ClientError> {
count: usize,
fail_count: usize,
error: Option<Client::ClientError>,
error: Option<ClientError>,
}

#[allow(clippy::type_complexity)]
pub fn countdown_failure_client<Client: ObjectClient>(
client: Client,
// For GET, map entries are interpreted as follows (operations are numbered starting at 1):
// (k -> Err(E) means return error E on the k'th GET
// (k -> Ok((n, E))) means return a stream object on the k'th get that
// returns error E on the n'th read request from that stream, otherwise reads from the underlying stream
// (Note: we could also define a failure client that tracks offsets, and returns an error when the offset
// reaches a specified threshold.)
get_results: RequestFailureMap<Client, GetObjectError>,
// For HEAD and LIST, map entries are interpreted as follows:
// (k -> E) means inject error E on the k'th call to that operation
head_failures: HashMap<usize, ObjectClientError<HeadObjectError, Client::ClientError>>,
list_failures: HashMap<usize, ObjectClientError<ListObjectsError, Client::ClientError>>,
// For PUT, map entries are interpreted as follows (operations are numbered starting at 1):
// (k -> Err(E) means return error E on the k'th PUT
// (k -> Ok((n, E))) means return a put request object on the k'th put that
// returns error E on the n'th write, otherwise writes to the underlying request.
put_results: RequestFailureMap<Client, PutObjectError>,
config: CountdownFailureConfig<<Client as ObjectClient>::ClientError>,
) -> CountdownFailureClient<Client> {
let state = Mutex::new(CountdownFailureClientState {
get_count: 0usize,
get_results,
get_failures: config.get_failures,
head_count: 0usize,
head_failures,
head_failures: config.head_failures,
list_count: 0usize,
list_failures,
list_failures: config.list_failures,
put_single_count: 0usize,
put_single_failures: config.put_single_failures,
put_count: 0usize,
put_results,
put_results: config.put_failures,
});
FailureClient {
client,
state,
get_object_cb: |state, _bucket, _key, _range, _if_match| {
state.get_count += 1;
let (fail_count, error) = if let Some(result) = state.get_results.remove(&state.get_count) {
let (fail_count, error) = if let Some(result) = state.get_failures.remove(&state.get_count) {
let (fail_count, error) = result?;
(fail_count, Some(error))
} else {
Expand Down Expand Up @@ -376,6 +396,14 @@ pub fn countdown_failure_client<Client: ObjectClient>(
Ok(())
}
},
put_object_single_cb: |state, _bucket, _key, _params, _data| {
state.put_single_count += 1;
if let Some(error) = state.put_single_failures.remove(&state.put_single_count) {
Err(error)
} else {
Ok(())
}
},
put_object_cb: |state, _bucket, _key, _params| {
state.put_count += 1;
let (fail_count, error) = if let Some(result) = state.put_results.remove(&state.put_count) {
Expand Down Expand Up @@ -440,8 +468,13 @@ mod tests {
Err(ObjectClientError::ClientError(MockClientError("no such bucket".into()))),
);

let fail_client =
countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new(), HashMap::new());
let fail_client = countdown_failure_client(
client,
CountdownFailureConfig {
get_failures,
..Default::default()
},
);

let fail_set = HashSet::from([2, 4, 5]);
for i in 1..=6 {
Expand All @@ -453,4 +486,39 @@ mod tests {
}
}
}

#[tokio::test]
async fn fail_client_sanity_check_put_single() {
let bucket = "test_bucket";
let key = "foo";

let client = MockClient::new(MockClientConfig {
bucket: bucket.to_string(),
..Default::default()
});

let mut put_single_failures = HashMap::new();
put_single_failures.insert(2, ObjectClientError::ClientError(MockClientError("error".into())));

let fail_client = countdown_failure_client(
client,
CountdownFailureConfig {
put_single_failures,
..Default::default()
},
);

let fail_set = HashSet::from([2]);
for i in 1..=3 {
let body = vec![0u8; 50];
let r = fail_client
.put_object_single(bucket, key, &PutObjectSingleParams::new(), body)
.await;
if fail_set.contains(&i) {
assert!(r.is_err());
} else {
assert!(r.is_ok());
}
}
}
}
2 changes: 1 addition & 1 deletion mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ impl Stream for MockGetObjectRequest {
}
}

#[derive(Debug, Error, PartialEq, Eq)]
#[derive(Debug, Default, Error, PartialEq, Eq)]
pub struct MockClientError(pub Cow<'static, str>);

impl std::fmt::Display for MockClientError {
Expand Down
20 changes: 10 additions & 10 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ mod tests {
use super::*;
use futures::executor::{block_on, ThreadPool};
use mountpoint_s3_client::error::GetObjectError;
use mountpoint_s3_client::failure_client::{countdown_failure_client, RequestFailureMap};
use mountpoint_s3_client::failure_client::{countdown_failure_client, CountdownFailureConfig, RequestFailureMap};
use mountpoint_s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject};
use mountpoint_s3_client::types::ETag;
use proptest::proptest;
Expand Down Expand Up @@ -800,7 +800,7 @@ mod tests {
size: u64,
read_size: usize,
test_config: TestConfig,
get_failures: RequestFailureMap<MockClient, GetObjectError>,
get_failures: RequestFailureMap<MockClientError, GetObjectError>,
) {
let config = MockClientConfig {
bucket: "test-bucket".to_string(),
Expand All @@ -817,10 +817,10 @@ mod tests {

let client = Arc::new(countdown_failure_client(
client,
get_failures,
HashMap::new(),
HashMap::new(),
HashMap::new(),
CountdownFailureConfig {
get_failures,
..Default::default()
},
));
let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT);

Expand Down Expand Up @@ -1126,10 +1126,10 @@ mod tests {

let client = Arc::new(countdown_failure_client(
client,
get_failures,
HashMap::new(),
HashMap::new(),
HashMap::new(),
CountdownFailureConfig {
get_failures,
..Default::default()
},
));
let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT);

Expand Down
10 changes: 5 additions & 5 deletions mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ mod tests {

use super::*;
use mountpoint_s3_client::{
failure_client::countdown_failure_client,
failure_client::{countdown_failure_client, CountdownFailureConfig},
mock_client::{MockClient, MockClientConfig, MockClientError},
};
use test_case::test_case;
Expand Down Expand Up @@ -337,10 +337,10 @@ mod tests {

let failure_client = Arc::new(countdown_failure_client(
client.clone(),
HashMap::new(),
HashMap::new(),
HashMap::new(),
put_failures,
CountdownFailureConfig {
put_failures,
..Default::default()
},
));

let uploader = Uploader::new(failure_client.clone(), None, ServerSideEncryption::default(), true);
Expand Down
26 changes: 13 additions & 13 deletions mountpoint-s3/tests/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use mountpoint_s3::S3FilesystemConfig;
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
#[cfg(all(feature = "s3_tests", not(feature = "s3express_tests")))]
use mountpoint_s3_client::error_metadata::ClientErrorMetadata;
use mountpoint_s3_client::failure_client::countdown_failure_client;
use mountpoint_s3_client::failure_client::{countdown_failure_client, CountdownFailureConfig};
use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, Operation};
use mountpoint_s3_client::types::{ETag, RestoreStatus};
use mountpoint_s3_client::ObjectClient;
Expand Down Expand Up @@ -753,10 +753,10 @@ async fn test_upload_aborted_on_write_failure() {

let failure_client = countdown_failure_client(
client.clone(),
Default::default(),
Default::default(),
Default::default(),
put_failures,
CountdownFailureConfig {
put_failures,
..Default::default()
},
);
let fs = make_test_filesystem_with_client(
Arc::new(failure_client),
Expand Down Expand Up @@ -828,10 +828,10 @@ async fn test_upload_aborted_on_fsync_failure() {

let failure_client = countdown_failure_client(
client.clone(),
Default::default(),
Default::default(),
Default::default(),
put_failures,
CountdownFailureConfig {
put_failures,
..Default::default()
},
);
let fs = make_test_filesystem_with_client(
Arc::new(failure_client),
Expand Down Expand Up @@ -888,10 +888,10 @@ async fn test_upload_aborted_on_release_failure() {

let failure_client = countdown_failure_client(
client.clone(),
Default::default(),
Default::default(),
Default::default(),
put_failures,
CountdownFailureConfig {
put_failures,
..Default::default()
},
);
let fs = make_test_filesystem_with_client(
Arc::new(failure_client),
Expand Down
Loading