Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate that shared cache bucket is usable #1141

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::time::Duration;
use anyhow::{anyhow, Context as _};
use clap::{value_parser, ArgGroup, Parser, ValueEnum};
use fuser::{MountOption, Session};
use futures::executor::block_on;
use futures::task::Spawn;
use mountpoint_s3_client::config::{AddressingStyle, EndpointConfig, S3ClientAuthConfig, S3ClientConfig};
use mountpoint_s3_client::error::ObjectClientError;
Expand Down Expand Up @@ -895,6 +896,8 @@ where
(None, Some((config, bucket_name, cache_bucket_name))) => {
tracing::trace!("using S3 Express One Zone bucket as a cache for object content");
let express_cache = ExpressDataCache::new(client.clone(), config, bucket_name, cache_bucket_name);
block_on(express_cache.verify_cache_valid())
.with_context(|| format!("initial PutObject failed for shared cache bucket {cache_bucket_name}"))?;

let prefetcher = caching_prefetch(express_cache, runtime, prefetcher_config);
let fuse_session = create_filesystem(
Expand Down Expand Up @@ -934,6 +937,8 @@ where
tracing::trace!("using both local disk and S3 Express One Zone bucket as a cache for object content");
let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?;
let express_cache = ExpressDataCache::new(client.clone(), config, bucket_name, cache_bucket_name);
block_on(express_cache.verify_cache_valid())
.with_context(|| format!("initial PutObject failed for shared cache bucket {cache_bucket_name}"))?;
let cache = MultilevelDataCache::new(Arc::new(disk_cache), express_cache, runtime.clone());

let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
Expand Down
86 changes: 81 additions & 5 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use async_trait::async_trait;
use base64ct::{Base64, Encoding};
use bytes::BytesMut;
use futures::{pin_mut, StreamExt};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError, PutObjectError};
use mountpoint_s3_client::types::{
ChecksumMode, GetObjectParams, GetObjectRequest, PutObjectSingleParams, UploadChecksum,
ChecksumMode, GetObjectParams, GetObjectRequest, ObjectClientResult, PutObjectSingleParams, UploadChecksum,
};
use mountpoint_s3_client::ObjectClient;
use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c};
Expand Down Expand Up @@ -72,6 +72,33 @@ where
bucket_name: bucket_name.to_owned(),
}
}

pub async fn verify_cache_valid(&self) -> ObjectClientResult<(), PutObjectError, Client::ClientError> {
muddyfish marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can now remove the todo

let object_key = format!("{}/_mountpoint_cache_metadata", &self.prefix);
// This data is human-readable, and not expected to be read by Mountpoint.
// The file format used here is NOT stable.
// For now, let's just include the data that's guaranteed to be correct as it's what
// calculates the prefix.
let data = format!(
"source_bucket={}\nblock_size={}",
vladem marked this conversation as resolved.
Show resolved Hide resolved
self.bucket_name, self.config.block_size
);

// put_object is sufficient for validating cache, as S3 Directory buckets only support
// read-only, or read-write. Write implies read access.
// Validating we're in a directory bucket by using the `EXPRESS_ONEZONE` storage class.
self.client
.put_object_single(
&self.bucket_name,
&object_key,
&PutObjectSingleParams::new().storage_class("EXPRESS_ONEZONE".to_string()),
data,
)
.in_current_span()
.await?;

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -328,10 +355,10 @@ mod tests {
use crate::sync::Arc;
use proptest::{prop_assert, proptest};

use test_case::test_case;

use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig};
use mountpoint_s3_client::failure_client::{countdown_failure_client, CountdownFailureConfig};
use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError};
use mountpoint_s3_client::types::ETag;
use test_case::test_case;

#[test_case(1024, 512 * 1024; "block_size smaller than part_size")]
#[test_case(8 * 1024 * 1024, 512 * 1024; "block_size larger than part_size")]
Expand Down Expand Up @@ -552,6 +579,55 @@ mod tests {
assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));
}

#[tokio::test]
async fn test_verify_cache_valid_success() {
let source_bucket = "source-bucket";
let bucket = "test-bucket";
let config = MockClientConfig {
bucket: bucket.to_string(),
part_size: 8 * 1024 * 1024,
enable_backpressure: true,
initial_read_window_size: 8 * 1024 * 1024,
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let cache = ExpressDataCache::new(client.clone(), Default::default(), source_bucket, bucket);

cache.verify_cache_valid().await.expect("cache should work");
}

#[tokio::test]
async fn test_verify_cache_valid_failure() {
let source_bucket = "source-bucket";
let bucket = "test-bucket";
let config = MockClientConfig {
bucket: bucket.to_string(),
part_size: 8 * 1024 * 1024,
enable_backpressure: true,
initial_read_window_size: 8 * 1024 * 1024,
..Default::default()
};
let client = Arc::new(MockClient::new(config));

let mut put_single_failures = HashMap::new();
put_single_failures.insert(1, MockClientError("error".to_owned().into()).into());

let failure_client = Arc::new(countdown_failure_client(
client.clone(),
CountdownFailureConfig {
put_single_failures,
..Default::default()
},
));

let cache = ExpressDataCache::new(failure_client, Default::default(), source_bucket, bucket);

cache
.verify_cache_valid()
.await
.expect_err("cache should not report valid if cannot write");
}

proptest! {
#[test]
fn proptest_creates_small_s3_keys(key: String, etag: String, block_idx: BlockIndex, source_description: String, block_size: u64) {
Expand Down
22 changes: 15 additions & 7 deletions mountpoint-s3/tests/common/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ pub mod mock_session {
pub mod s3_session {
use super::*;

use crate::common::s3::{
get_test_bucket_and_prefix, get_test_endpoint_config, get_test_region, get_test_sdk_client,
};
use aws_sdk_s3::operation::head_object::HeadObjectError;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{ChecksumAlgorithm, GlacierJobParameters, RestoreRequest, Tier};
Expand All @@ -319,10 +322,6 @@ pub mod s3_session {
use mountpoint_s3_client::types::{Checksum, PutObjectTrailingChecksums};
use mountpoint_s3_client::S3CrtClient;

use crate::common::s3::{
get_test_bucket_and_prefix, get_test_endpoint_config, get_test_region, get_test_sdk_client,
};

/// Create a FUSE mount backed by a real S3 client
pub fn new(test_name: &str, test_config: TestSessionConfig) -> TestSession {
let mount_dir = tempfile::tempdir().unwrap();
Expand Down Expand Up @@ -363,7 +362,11 @@ pub mod s3_session {
let (bucket, prefix) = get_test_bucket_and_prefix(test_name);
let region = get_test_region();

let client = create_crt_client(test_config.part_size, test_config.initial_read_window_size);
let client = create_crt_client(
test_config.part_size,
test_config.initial_read_window_size,
Default::default(),
);
let runtime = client.event_loop_group();
let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config);
let session = create_fuse_session(
Expand All @@ -380,12 +383,17 @@ pub mod s3_session {
}
}

pub fn create_crt_client(part_size: usize, initial_read_window_size: usize) -> S3CrtClient {
pub fn create_crt_client(
part_size: usize,
initial_read_window_size: usize,
auth_config: S3ClientAuthConfig,
) -> S3CrtClient {
let client_config = S3ClientConfig::default()
.part_size(part_size)
.endpoint_config(get_test_endpoint_config())
.read_backpressure(true)
.initial_read_window(initial_read_window_size);
.initial_read_window(initial_read_window_size)
.auth_config(auth_config);
S3CrtClient::new(client_config).unwrap()
}

Expand Down
75 changes: 72 additions & 3 deletions mountpoint-s3/tests/fuse_tests/cache_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn express_invalid_block_read() {
let prefix = get_test_prefix("express_invalid_block_read");

// Mount the bucket
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default());
let cache = CacheTestWrapper::new(ExpressDataCache::new(
client.clone(),
Default::default(),
Expand Down Expand Up @@ -100,7 +100,7 @@ async fn express_invalid_block_read() {
#[test_case("key", 100, 1024 * 1024; "big file")]
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) {
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default());
let bucket_name = get_standard_bucket();
let express_bucket_name = get_express_bucket();
let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &express_bucket_name);
Expand Down Expand Up @@ -129,7 +129,7 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize)
};
let cache = DiskDataCache::new(cache_dir.path().to_path_buf(), cache_config);

let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default());

let bucket_name = get_test_bucket();
cache_write_read_base(
Expand All @@ -143,6 +143,75 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize)
);
}

#[tokio::test]
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
async fn express_cache_verify_fail_non_express() {
use mountpoint_s3_client::error::ObjectClientError;
use mountpoint_s3_client::S3RequestError::ResponseError;

let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default());
let bucket_name = get_standard_bucket();
let cache_bucket_name = get_standard_bucket();
let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &cache_bucket_name);
let err = cache
.verify_cache_valid()
.await
.expect_err("cannot use standard bucket as shared cache");

if let ObjectClientError::ClientError(ResponseError(request_result)) = err {
let body = request_result.error_response_body.as_ref().expect("should have body");
let body = body.clone().into_string().unwrap();
assert!(body.contains("<Code>InvalidStorageClass</Code>"));
vladem marked this conversation as resolved.
Show resolved Hide resolved
} else {
panic!("wrong error type");
}
}

#[tokio::test]
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
async fn express_cache_verify_fail_forbidden() {
use crate::common::creds::get_scoped_down_credentials;
use mountpoint_s3_client::config::S3ClientAuthConfig;
use mountpoint_s3_client::error::ObjectClientError;
use mountpoint_s3_client::S3RequestError::CrtError;
use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions};
use mountpoint_s3_crt::common::allocator::Allocator;

let bucket_name = get_standard_bucket();
let cache_bucket_name = get_express_bucket();

// No `s3express:CreateSession` in this policy, so we should get a forbidden error.
let policy = r#"{"Statement": [
vladem marked this conversation as resolved.
Show resolved Hide resolved
{"Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload"], "Resource": "arn:aws:s3:::__BUCKET__/*"},
{"Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::__BUCKET__"}
]}"#;
let policy = policy.replace("__BUCKET__", &cache_bucket_name);
let credentials = get_scoped_down_credentials(policy).await;

// Build a S3CrtClient that uses a static credentials provider with the creds we just got
let config = CredentialsProviderStaticOptions {
access_key_id: credentials.access_key_id(),
secret_access_key: credentials.secret_access_key(),
session_token: credentials.session_token(),
};
let provider = CredentialsProvider::new_static(&Allocator::default(), config).unwrap();

let client = create_crt_client(
CLIENT_PART_SIZE,
CLIENT_PART_SIZE,
S3ClientAuthConfig::Provider(provider),
);

let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &cache_bucket_name);
let err = cache.verify_cache_valid().await.expect_err("cache must be write-able");

if let ObjectClientError::ClientError(CrtError(err)) = err {
assert!(err.to_string().contains("AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED"))
} else {
panic!("wrong error type");
}
}

fn cache_write_read_base<Cache>(
client: S3CrtClient,
bucket: &str,
Expand Down
Loading