Skip to content

Commit

Permalink
Initial implementation of a shared cache on S3 Express (#1032)
Browse files Browse the repository at this point in the history
* Make cache block size user configurable (default 1024 KiB)

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Require Clone on ObjectClient

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Implement initial draft of shared cache in Express

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Encode cache version and block size into keys

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Decouple DataCacheError from io::Error

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Improve error handling

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Add unit test

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Allow sharing the cache when mounting with different prefixes

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Fix flow-control window

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

---------

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
  • Loading branch information
passaro authored Sep 26, 2024
1 parent 6cda304 commit 0b7d0ae
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 66 deletions.
2 changes: 2 additions & 0 deletions mountpoint-s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ built = { version = "0.7.1", features = ["git2"] }

[features]
# Unreleased feature flags
express_cache = ["block_size"]
block_size = []
event_log = []
# Features for choosing tests
fips_tests = []
Expand Down
6 changes: 4 additions & 2 deletions mountpoint-s3/src/bin/mock-mount-s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
//!
//! This binary is intended only for use in testing and development of Mountpoint.

use std::sync::Arc;

use anyhow::anyhow;
use futures::executor::ThreadPool;

Expand All @@ -23,7 +25,7 @@ fn main() -> anyhow::Result<()> {
mountpoint_s3::cli::main(create_mock_client)
}

fn create_mock_client(args: &CliArgs) -> anyhow::Result<(ThroughputMockClient, ThreadPool, S3Personality)> {
fn create_mock_client(args: &CliArgs) -> anyhow::Result<(Arc<ThroughputMockClient>, ThreadPool, S3Personality)> {
// An extra little safety thing to make sure we can distinguish the real mount-s3 binary and
// this one. Buckets starting with "sthree-" are always invalid against real S3:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
Expand Down Expand Up @@ -81,5 +83,5 @@ fn create_mock_client(args: &CliArgs) -> anyhow::Result<(ThroughputMockClient, T
MockObject::from_bytes(b"hello world", ETag::for_tests()),
);

Ok((client, runtime, s3_personality))
Ok((Arc::new(client), runtime, s3_personality))
}
98 changes: 81 additions & 17 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use nix::unistd::ForkResult;
use regex::Regex;

use crate::build_info;
use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ManagedCacheDir};
use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir};
use crate::fs::{CacheConfig, S3FilesystemConfig, ServerSideEncryption, TimeToLive};
use crate::fuse::session::FuseSession;
use crate::fuse::S3FuseFilesystem;
Expand Down Expand Up @@ -261,6 +261,7 @@ pub struct CliArgs {
help = "Enable caching of object content to the given directory and set metadata TTL to 60 seconds",
help_heading = CACHING_OPTIONS_HEADER,
value_name = "DIRECTORY",
group = "cache_group",
)]
pub cache: Option<PathBuf>,

Expand All @@ -282,6 +283,27 @@ pub struct CliArgs {
)]
pub max_cache_size: Option<u64>,

#[cfg(feature = "block_size")]
#[clap(
long,
help = "Size of a cache block in KiB [Default: 1024 (1 MiB) for disk cache, 512 (512 KiB) for S3 Express cache]",
help_heading = CACHING_OPTIONS_HEADER,
value_name = "KiB",
requires = "cache_group"
)]
pub cache_block_size: Option<u64>,

#[cfg(feature = "express_cache")]
#[clap(
long,
help = "Enable caching of object content to the specified bucket on S3 Express One Zone (same region only)",
help_heading = CACHING_OPTIONS_HEADER,
value_name = "BUCKET",
value_parser = parse_bucket_name,
group = "cache_group",
)]
pub cache_express: Option<String>,

#[clap(
long,
help = "Configure a string to be prepended to the 'User-Agent' HTTP request header for all S3 requests",
Expand Down Expand Up @@ -384,6 +406,25 @@ impl CliArgs {
self.prefix.as_ref().cloned().unwrap_or_default()
}

fn cache_block_size_in_bytes(&self) -> u64 {
#[cfg(feature = "block_size")]
if let Some(kib) = self.cache_block_size {
return kib * 1024;
}
if self.cache_express_bucket_name().is_some() {
return 512 * 1024; // 512 KiB block size - default for express cache
}
1024 * 1024 // 1 MiB block size - default for disk cache
}

fn cache_express_bucket_name(&self) -> Option<&str> {
#[cfg(feature = "express_cache")]
if let Some(bucket_name) = &self.cache_express {
return Some(bucket_name);
}
None
}

fn logging_config(&self) -> LoggingConfig {
let default_filter = if self.no_log {
String::from("off")
Expand Down Expand Up @@ -450,7 +491,7 @@ impl CliArgs {
pub fn main<ClientBuilder, Client, Runtime>(client_builder: ClientBuilder) -> anyhow::Result<()>
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Client: ObjectClient + Send + Sync + 'static,
Client: ObjectClient + Clone + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
let args = CliArgs::parse();
Expand Down Expand Up @@ -699,7 +740,7 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo
fn mount<ClientBuilder, Client, Runtime>(args: CliArgs, client_builder: ClientBuilder) -> anyhow::Result<FuseSession>
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Client: ObjectClient + Send + Sync + 'static,
Client: ObjectClient + Clone + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
tracing::info!("mount-s3 {}", build_info::FULL_VERSION);
Expand Down Expand Up @@ -728,11 +769,11 @@ where
if let Some(file_mode) = args.file_mode {
filesystem_config.file_mode = file_mode;
}
filesystem_config.storage_class = args.storage_class;
filesystem_config.storage_class = args.storage_class.clone();
filesystem_config.allow_delete = args.allow_delete;
filesystem_config.allow_overwrite = args.allow_overwrite;
filesystem_config.s3_personality = s3_personality;
filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse, args.sse_kms_key_id);
filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone());

// Written in this awkward way to force us to update it if we add new checksum types
filesystem_config.use_upload_checksums = match args.upload_checksums {
Expand All @@ -747,7 +788,7 @@ where
let prefetcher_config = Default::default();

let mut metadata_cache_ttl = args.metadata_ttl.unwrap_or_else(|| {
if args.cache.is_some() {
if args.cache.is_some() || args.cache_express_bucket_name().is_some() {
// When the data cache is enabled, use 1min as metadata-ttl.
TimeToLive::Duration(Duration::from_secs(60))
} else {
Expand All @@ -771,20 +812,20 @@ where
tracing::trace!("using metadata TTL setting {metadata_cache_ttl:?}");
filesystem_config.cache_config = CacheConfig::new(metadata_cache_ttl);

if let Some(path) = args.cache {
let cache_config = match args.max_cache_size {
if let Some(path) = &args.cache {
let cache_limit = match args.max_cache_size {
// Fallback to no data cache.
Some(0) => None,
Some(max_size_in_mib) => Some(DiskDataCacheConfig {
limit: CacheLimit::TotalSize {
max_size: (max_size_in_mib * 1024 * 1024) as usize,
},
..Default::default()
Some(max_size_in_mib) => Some(CacheLimit::TotalSize {
max_size: (max_size_in_mib * 1024 * 1024) as usize,
}),
None => Some(DiskDataCacheConfig::default()),
None => Some(CacheLimit::default()),
};

if let Some(cache_config) = cache_config {
if let Some(cache_limit) = cache_limit {
let cache_config = DiskDataCacheConfig {
block_size: args.cache_block_size_in_bytes(),
limit: cache_limit,
};
let cache_key = env_unstable_cache_key();
let managed_cache_dir = ManagedCacheDir::new_from_parent_with_cache_key(path, cache_key)
.context("failed to create cache directory")?;
Expand All @@ -809,6 +850,29 @@ where
}
}

if let Some(express_bucket_name) = args.cache_express_bucket_name() {
// The cache can be shared across instances mounting the same bucket (including with different prefixes)
let source_description = &args.bucket_name;
let cache = ExpressDataCache::new(
express_bucket_name,
client.clone(),
source_description,
args.cache_block_size_in_bytes(),
);
let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
let fuse_session = create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)?;

return Ok(fuse_session);
};

let prefetcher = default_prefetch(runtime, prefetcher_config);
create_filesystem(
client,
Expand All @@ -831,7 +895,7 @@ fn create_filesystem<Client, Prefetcher>(
bucket_description: &str,
) -> anyhow::Result<FuseSession>
where
Client: ObjectClient + Send + Sync + 'static,
Client: ObjectClient + Clone + Send + Sync + 'static,
Prefetcher: Prefetch + Send + Sync + 'static,
{
tracing::trace!(?filesystem_config, "creating file system");
Expand Down
4 changes: 3 additions & 1 deletion mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

mod cache_directory;
mod disk_data_cache;
mod express_data_cache;
mod in_memory_data_cache;

use async_trait::async_trait;
Expand All @@ -14,6 +15,7 @@ use thiserror::Error;
pub use crate::checksums::ChecksummedBytes;
pub use crate::data_cache::cache_directory::ManagedCacheDir;
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::express_data_cache::ExpressDataCache;
pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache;

use crate::object::ObjectId;
Expand All @@ -25,7 +27,7 @@ pub type BlockIndex = u64;
#[derive(Debug, Error)]
pub enum DataCacheError {
#[error("IO error when reading or writing from cache: {0}")]
IoFailure(#[from] std::io::Error),
IoFailure(#[source] anyhow::Error),
#[error("Block content was not valid/readable")]
InvalidBlockContent,
#[error("Block offset does not match block index")]
Expand Down
21 changes: 12 additions & 9 deletions mountpoint-s3/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ pub struct DiskDataCacheConfig {
pub limit: CacheLimit,
}

impl Default for DiskDataCacheConfig {
fn default() -> Self {
Self {
block_size: 1024 * 1024, // 1 MiB block size
limit: CacheLimit::AvailableSpace { min_ratio: 0.05 }, // Preserve 5% available space
}
}
}

/// Limit the cache size.
#[derive(Debug)]
pub enum CacheLimit {
Expand All @@ -62,6 +53,12 @@ pub enum CacheLimit {
AvailableSpace { min_ratio: f64 },
}

impl Default for CacheLimit {
fn default() -> Self {
CacheLimit::AvailableSpace { min_ratio: 0.05 } // Preserve 5% available space
}
}

/// Describes additional information about the data stored in the block.
///
/// It should be written alongside the block's data
Expand Down Expand Up @@ -203,6 +200,12 @@ impl DiskBlock {
}
}

impl From<std::io::Error> for DataCacheError {
fn from(e: std::io::Error) -> Self {
DataCacheError::IoFailure(e.into())
}
}

impl DiskDataCache {
/// Create a new instance of an [DiskDataCache] with the specified configuration.
pub fn new(cache_directory: PathBuf, config: DiskDataCacheConfig) -> Self {
Expand Down
Loading

0 comments on commit 0b7d0ae

Please sign in to comment.