Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of a shared cache on S3 Express #1032

Merged
merged 9 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mountpoint-s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ built = { version = "0.7.1", features = ["git2"] }

[features]
# Unreleased feature flags
express_cache = ["block_size"]
block_size = []
event_log = []
# Features for choosing tests
fips_tests = []
Expand Down
6 changes: 4 additions & 2 deletions mountpoint-s3/src/bin/mock-mount-s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
//!
//! This binary is intended only for use in testing and development of Mountpoint.

use std::sync::Arc;

use anyhow::anyhow;
use futures::executor::ThreadPool;

Expand All @@ -23,7 +25,7 @@ fn main() -> anyhow::Result<()> {
mountpoint_s3::cli::main(create_mock_client)
}

fn create_mock_client(args: &CliArgs) -> anyhow::Result<(ThroughputMockClient, ThreadPool, S3Personality)> {
fn create_mock_client(args: &CliArgs) -> anyhow::Result<(Arc<ThroughputMockClient>, ThreadPool, S3Personality)> {
// An extra little safety thing to make sure we can distinguish the real mount-s3 binary and
// this one. Buckets starting with "sthree-" are always invalid against real S3:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
Expand Down Expand Up @@ -81,5 +83,5 @@ fn create_mock_client(args: &CliArgs) -> anyhow::Result<(ThroughputMockClient, T
MockObject::from_bytes(b"hello world", ETag::for_tests()),
);

Ok((client, runtime, s3_personality))
Ok((Arc::new(client), runtime, s3_personality))
}
98 changes: 81 additions & 17 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use nix::unistd::ForkResult;
use regex::Regex;

use crate::build_info;
use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ManagedCacheDir};
use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir};
use crate::fs::{CacheConfig, S3FilesystemConfig, ServerSideEncryption, TimeToLive};
use crate::fuse::session::FuseSession;
use crate::fuse::S3FuseFilesystem;
Expand Down Expand Up @@ -261,6 +261,7 @@ pub struct CliArgs {
help = "Enable caching of object content to the given directory and set metadata TTL to 60 seconds",
help_heading = CACHING_OPTIONS_HEADER,
value_name = "DIRECTORY",
group = "cache_group",
)]
pub cache: Option<PathBuf>,

Expand All @@ -282,6 +283,27 @@ pub struct CliArgs {
)]
pub max_cache_size: Option<u64>,

#[cfg(feature = "block_size")]
#[clap(
long,
help = "Size of a cache block in KiB [Default: 1024 (1 MiB) for disk cache, 512 (512 KiB) for S3 Express cache]",
help_heading = CACHING_OPTIONS_HEADER,
value_name = "KiB",
requires = "cache_group"
)]
pub cache_block_size: Option<u64>,

#[cfg(feature = "express_cache")]
#[clap(
long,
help = "Enable caching of object content to the specified bucket on S3 Express One Zone (same region only)",
help_heading = CACHING_OPTIONS_HEADER,
value_name = "BUCKET",
value_parser = parse_bucket_name,
group = "cache_group",
)]
pub cache_express: Option<String>,

#[clap(
long,
help = "Configure a string to be prepended to the 'User-Agent' HTTP request header for all S3 requests",
Expand Down Expand Up @@ -384,6 +406,25 @@ impl CliArgs {
self.prefix.as_ref().cloned().unwrap_or_default()
}

fn cache_block_size_in_bytes(&self) -> u64 {
#[cfg(feature = "block_size")]
if let Some(kib) = self.cache_block_size {
return kib * 1024;
}
if self.cache_express_bucket_name().is_some() {
return 512 * 1024; // 512 KiB block size - default for express cache
}
1024 * 1024 // 1 MiB block size - default for disk cache
}

fn cache_express_bucket_name(&self) -> Option<&str> {
#[cfg(feature = "express_cache")]
if let Some(bucket_name) = &self.cache_express {
return Some(bucket_name);
}
None
}

fn logging_config(&self) -> LoggingConfig {
let default_filter = if self.no_log {
String::from("off")
Expand Down Expand Up @@ -450,7 +491,7 @@ impl CliArgs {
pub fn main<ClientBuilder, Client, Runtime>(client_builder: ClientBuilder) -> anyhow::Result<()>
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Client: ObjectClient + Send + Sync + 'static,
Client: ObjectClient + Clone + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
let args = CliArgs::parse();
Expand Down Expand Up @@ -699,7 +740,7 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo
fn mount<ClientBuilder, Client, Runtime>(args: CliArgs, client_builder: ClientBuilder) -> anyhow::Result<FuseSession>
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Client: ObjectClient + Send + Sync + 'static,
Client: ObjectClient + Clone + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
tracing::info!("mount-s3 {}", build_info::FULL_VERSION);
Expand Down Expand Up @@ -728,11 +769,11 @@ where
if let Some(file_mode) = args.file_mode {
filesystem_config.file_mode = file_mode;
}
filesystem_config.storage_class = args.storage_class;
filesystem_config.storage_class = args.storage_class.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: revert? (probably the idea was to reject express cache when the mounted bucket is also on express?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the .clone() is to avoid invalidating args, which is required later on.

But you are right we should also consider the storage class when we add validation for the shared cache!

filesystem_config.allow_delete = args.allow_delete;
filesystem_config.allow_overwrite = args.allow_overwrite;
filesystem_config.s3_personality = s3_personality;
filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse, args.sse_kms_key_id);
filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone());

// Written in this awkward way to force us to update it if we add new checksum types
filesystem_config.use_upload_checksums = match args.upload_checksums {
Expand All @@ -747,7 +788,7 @@ where
let prefetcher_config = Default::default();

let mut metadata_cache_ttl = args.metadata_ttl.unwrap_or_else(|| {
if args.cache.is_some() {
if args.cache.is_some() || args.cache_express_bucket_name().is_some() {
// When the data cache is enabled, use 1min as metadata-ttl.
TimeToLive::Duration(Duration::from_secs(60))
} else {
Expand All @@ -771,20 +812,20 @@ where
tracing::trace!("using metadata TTL setting {metadata_cache_ttl:?}");
filesystem_config.cache_config = CacheConfig::new(metadata_cache_ttl);

if let Some(path) = args.cache {
let cache_config = match args.max_cache_size {
if let Some(path) = &args.cache {
let cache_limit = match args.max_cache_size {
// Fallback to no data cache.
Some(0) => None,
Some(max_size_in_mib) => Some(DiskDataCacheConfig {
limit: CacheLimit::TotalSize {
max_size: (max_size_in_mib * 1024 * 1024) as usize,
},
..Default::default()
Some(max_size_in_mib) => Some(CacheLimit::TotalSize {
max_size: (max_size_in_mib * 1024 * 1024) as usize,
}),
None => Some(DiskDataCacheConfig::default()),
None => Some(CacheLimit::default()),
};

if let Some(cache_config) = cache_config {
if let Some(cache_limit) = cache_limit {
let cache_config = DiskDataCacheConfig {
block_size: args.cache_block_size_in_bytes(),
limit: cache_limit,
};
let cache_key = env_unstable_cache_key();
let managed_cache_dir = ManagedCacheDir::new_from_parent_with_cache_key(path, cache_key)
.context("failed to create cache directory")?;
Expand All @@ -809,6 +850,29 @@ where
}
}

if let Some(express_bucket_name) = args.cache_express_bucket_name() {
// The cache can be shared across instances mounting the same bucket (including with different prefixes)
let source_description = &args.bucket_name;
let cache = ExpressDataCache::new(
express_bucket_name,
client.clone(),
source_description,
args.cache_block_size_in_bytes(),
);
let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
let fuse_session = create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)?;

return Ok(fuse_session);
};

let prefetcher = default_prefetch(runtime, prefetcher_config);
create_filesystem(
client,
Expand All @@ -831,7 +895,7 @@ fn create_filesystem<Client, Prefetcher>(
bucket_description: &str,
) -> anyhow::Result<FuseSession>
where
Client: ObjectClient + Send + Sync + 'static,
Client: ObjectClient + Clone + Send + Sync + 'static,
Prefetcher: Prefetch + Send + Sync + 'static,
{
tracing::trace!(?filesystem_config, "creating file system");
Expand Down
4 changes: 3 additions & 1 deletion mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

mod cache_directory;
mod disk_data_cache;
mod express_data_cache;
mod in_memory_data_cache;

use async_trait::async_trait;
Expand All @@ -14,6 +15,7 @@ use thiserror::Error;
pub use crate::checksums::ChecksummedBytes;
pub use crate::data_cache::cache_directory::ManagedCacheDir;
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::express_data_cache::ExpressDataCache;
pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache;

use crate::object::ObjectId;
Expand All @@ -25,7 +27,7 @@ pub type BlockIndex = u64;
#[derive(Debug, Error)]
pub enum DataCacheError {
#[error("IO error when reading or writing from cache: {0}")]
IoFailure(#[from] std::io::Error),
IoFailure(#[source] anyhow::Error),
vladem marked this conversation as resolved.
Show resolved Hide resolved
#[error("Block content was not valid/readable")]
InvalidBlockContent,
#[error("Block offset does not match block index")]
Expand Down
21 changes: 12 additions & 9 deletions mountpoint-s3/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ pub struct DiskDataCacheConfig {
pub limit: CacheLimit,
}

impl Default for DiskDataCacheConfig {
fn default() -> Self {
Self {
block_size: 1024 * 1024, // 1 MiB block size
limit: CacheLimit::AvailableSpace { min_ratio: 0.05 }, // Preserve 5% available space
}
}
}

/// Limit the cache size.
#[derive(Debug)]
pub enum CacheLimit {
Expand All @@ -62,6 +53,12 @@ pub enum CacheLimit {
AvailableSpace { min_ratio: f64 },
}

impl Default for CacheLimit {
fn default() -> Self {
CacheLimit::AvailableSpace { min_ratio: 0.05 } // Preserve 5% available space
}
}

/// Describes additional information about the data stored in the block.
///
/// It should be written alongside the block's data
Expand Down Expand Up @@ -203,6 +200,12 @@ impl DiskBlock {
}
}

impl From<std::io::Error> for DataCacheError {
fn from(e: std::io::Error) -> Self {
DataCacheError::IoFailure(e.into())
}
}

impl DiskDataCache {
/// Create a new instance of an [DiskDataCache] with the specified configuration.
pub fn new(cache_directory: PathBuf, config: DiskDataCacheConfig) -> Self {
Expand Down
Loading
Loading