Skip to content

Commit

Permalink
Multilevel cache
Browse files Browse the repository at this point in the history
Signed-off-by: Vlad Volodkin <vlaad@amazon.com>
  • Loading branch information
Vlad Volodkin committed Nov 5, 2024
1 parent db4571f commit a232279
Show file tree
Hide file tree
Showing 5 changed files with 467 additions and 72 deletions.
14 changes: 12 additions & 2 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ lazy_static! {
static ref RAMP_BYTES: Vec<u8> = ramp_bytes(0, RAMP_BUFFER_SIZE + RAMP_MODULUS);
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct MockClientConfig {
/// The bucket name this client will connect to
pub bucket: String,
Expand All @@ -74,7 +74,7 @@ pub struct MockClientConfig {

/// A mock implementation of an object client that we can manually add objects to, and then query
/// via the [ObjectClient] APIs.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MockClient {
config: MockClientConfig,
objects: Arc<RwLock<BTreeMap<String, MockObject>>>,
Expand Down Expand Up @@ -107,6 +107,16 @@ impl MockClient {
self.objects.write().unwrap().remove(key);
}

/// Remove all objects for the mock client's bucket
pub fn remove_all_objects(&self) {
self.objects.write().unwrap().clear();
}

/// Number of objects in the mock client's bucket
pub fn object_count(&self) -> usize {
self.objects.write().unwrap().len()
}

/// Returns `true` if this mock client's bucket contains the specified key
pub fn contains_key(&self, key: &str) -> bool {
self.objects.read().unwrap().contains_key(key)
Expand Down
179 changes: 117 additions & 62 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::io::{Read, Write};
use std::num::NonZeroUsize;
use std::os::fd::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Context as _};
use clap::{value_parser, Parser, ValueEnum};
use clap::{value_parser, ArgGroup, Parser, ValueEnum};
use fuser::{MountOption, Session};
use futures::task::Spawn;
use mountpoint_s3_client::config::{AddressingStyle, EndpointConfig, S3ClientAuthConfig, S3ClientConfig};
Expand All @@ -26,7 +27,9 @@ use nix::unistd::ForkResult;
use regex::Regex;
use sysinfo::{RefreshKind, System};

use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir};
use crate::data_cache::{
CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir, MultilevelDataCache,
};
use crate::fs::{CacheConfig, ServerSideEncryption, TimeToLive};
use crate::fuse::session::FuseSession;
use crate::fuse::S3FuseFilesystem;
Expand All @@ -46,7 +49,15 @@ const CACHING_OPTIONS_HEADER: &str = "Caching options";
const ADVANCED_OPTIONS_HEADER: &str = "Advanced options";

#[derive(Parser, Debug)]
#[clap(name = "mount-s3", about = "Mountpoint for Amazon S3", version = build_info::FULL_VERSION)]
#[clap(
name = "mount-s3",
about = "Mountpoint for Amazon S3",
version = build_info::FULL_VERSION,
group(
ArgGroup::new("cache_group")
.multiple(true),
),
)]
pub struct CliArgs {
#[clap(help = "Name of bucket to mount", value_parser = parse_bucket_name)]
pub bucket_name: String,
Expand Down Expand Up @@ -298,10 +309,10 @@ pub struct CliArgs {
#[cfg(feature = "block_size")]
#[clap(
long,
help = "Size of a cache block in KiB [Default: 1024 (1 MiB) for disk cache, 512 (512 KiB) for S3 Express cache]",
help = "Size of a cache block in KiB [Default: 1024 (1 MiB) for disk cache and for S3 Express cache]",
help_heading = CACHING_OPTIONS_HEADER,
value_name = "KiB",
requires = "cache_group"
requires = "cache_group",
)]
pub cache_block_size: Option<u64>,

Expand Down Expand Up @@ -423,10 +434,7 @@ impl CliArgs {
if let Some(kib) = self.cache_block_size {
return kib * 1024;
}
if self.cache_express_bucket_name().is_some() {
return 512 * 1024; // 512 KiB block size - default for express cache
}
1024 * 1024 // 1 MiB block size - default for disk cache
1024 * 1024 // 1 MiB block size - default for disk cache and for express cache
}

fn cache_express_bucket_name(&self) -> Option<&str> {
Expand All @@ -437,6 +445,27 @@ impl CliArgs {
None
}

fn disk_data_cache_config(&self) -> Option<(&Path, DiskDataCacheConfig)> {
match self.cache.as_ref() {
Some(path) => {
let cache_limit = match self.max_cache_size {
// Fallback to no data cache.
Some(0) => return None,
Some(max_size_in_mib) => CacheLimit::TotalSize {
max_size: (max_size_in_mib * 1024 * 1024) as usize,
},
None => CacheLimit::default(),
};
let cache_config = DiskDataCacheConfig {
block_size: self.cache_block_size_in_bytes(),
limit: cache_limit,
};
Some((path.as_path(), cache_config))
}
None => None,
}
}

/// Generates a logging configuration based on the CLI arguments.
///
/// This includes random string generation which can change with each invocation,
Expand Down Expand Up @@ -756,6 +785,17 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo
Ok((client, runtime, s3_personality))
}

fn create_disk_cache(
cache_dir_path: &Path,
cache_config: DiskDataCacheConfig,
) -> anyhow::Result<(ManagedCacheDir, DiskDataCache)> {
let cache_key = env_unstable_cache_key();
let managed_cache_dir = ManagedCacheDir::new_from_parent_with_cache_key(cache_dir_path, cache_key)
.context("failed to create cache directory")?;
let cache_dir_path = managed_cache_dir.as_path_buf();
Ok((managed_cache_dir, DiskDataCache::new(cache_dir_path, cache_config)))
}

fn mount<ClientBuilder, Client, Runtime>(args: CliArgs, client_builder: ClientBuilder) -> anyhow::Result<FuseSession>
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Expand Down Expand Up @@ -840,26 +880,34 @@ where
tracing::trace!("using metadata TTL setting {metadata_cache_ttl:?}");
filesystem_config.cache_config = CacheConfig::new(metadata_cache_ttl);

if let Some(path) = &args.cache {
let cache_limit = match args.max_cache_size {
// Fallback to no data cache.
Some(0) => None,
Some(max_size_in_mib) => Some(CacheLimit::TotalSize {
max_size: (max_size_in_mib * 1024 * 1024) as usize,
}),
None => Some(CacheLimit::default()),
};
if let Some(cache_limit) = cache_limit {
let cache_config = DiskDataCacheConfig {
block_size: args.cache_block_size_in_bytes(),
limit: cache_limit,
};
let cache_key = env_unstable_cache_key();
let managed_cache_dir = ManagedCacheDir::new_from_parent_with_cache_key(path, cache_key)
.context("failed to create cache directory")?;
match (args.disk_data_cache_config(), args.cache_express_bucket_name()) {
(None, Some(express_bucket_name)) => {
tracing::trace!("using S3 Express One Zone bucket as a cache for object content");
let express_cache = ExpressDataCache::new(
express_bucket_name,
client.clone(),
&args.bucket_name,
args.cache_block_size_in_bytes(),
);

let cache = DiskDataCache::new(managed_cache_dir.as_path_buf(), cache_config);
let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
let prefetcher = caching_prefetch(express_cache, runtime, prefetcher_config);
let fuse_session = create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)?;

Ok(fuse_session)
}
(Some((cache_dir_path, disk_data_cache_config)), None) => {
tracing::trace!("using local disk as a cache for object content");
let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?;

let prefetcher = caching_prefetch(disk_cache, runtime, prefetcher_config);
let mut fuse_session = create_filesystem(
client,
prefetcher,
Expand All @@ -874,43 +922,50 @@ where
drop(managed_cache_dir);
}));

return Ok(fuse_session);
Ok(fuse_session)
}
}
(Some((cache_dir_path, disk_data_cache_config)), Some(express_bucket_name)) => {
tracing::trace!("using both local disk and S3 Express One Zone bucket as a cache for object content");
let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?;
let express_cache = ExpressDataCache::new(
express_bucket_name,
client.clone(),
&args.bucket_name,
args.cache_block_size_in_bytes(),
);
let cache = MultilevelDataCache::new(Arc::new(disk_cache), express_cache, runtime.clone());

if let Some(express_bucket_name) = args.cache_express_bucket_name() {
// The cache can be shared across instances mounting the same bucket (including with different prefixes)
let source_description = &args.bucket_name;
let cache = ExpressDataCache::new(
express_bucket_name,
client.clone(),
source_description,
args.cache_block_size_in_bytes(),
);
let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
let fuse_session = create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)?;

return Ok(fuse_session);
};
let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
let mut fuse_session = create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)?;

let prefetcher = default_prefetch(runtime, prefetcher_config);
create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)
fuse_session.run_on_close(Box::new(move || {
drop(managed_cache_dir);
}));

Ok(fuse_session)
}
_ => {
tracing::trace!("using no cache");
let prefetcher = default_prefetch(runtime, prefetcher_config);
create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)
}
}
}

fn create_filesystem<Client, Prefetcher>(
Expand Down
2 changes: 2 additions & 0 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod cache_directory;
mod disk_data_cache;
mod express_data_cache;
mod in_memory_data_cache;
mod multilevel_cache;

use async_trait::async_trait;
use thiserror::Error;
Expand All @@ -17,6 +18,7 @@ pub use crate::data_cache::cache_directory::ManagedCacheDir;
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::express_data_cache::ExpressDataCache;
pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache;
pub use crate::data_cache::multilevel_cache::MultilevelDataCache;

use crate::object::ObjectId;

Expand Down
19 changes: 11 additions & 8 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,18 @@ where
// TODO: optimize for the common case of a single chunk.
let mut buffer = BytesMut::default();
while let Some(chunk) = result.next().await {
let (offset, body) = chunk?;
if offset != buffer.len() as u64 {
return Err(DataCacheError::InvalidBlockOffset);
match chunk {
Ok((offset, body)) => {
if offset != buffer.len() as u64 {
return Err(DataCacheError::InvalidBlockOffset);
}
buffer.extend_from_slice(&body);

result.as_mut().increment_read_window(self.block_size as usize);
}
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None),
Err(e) => return Err(e.into()),
}
buffer.extend_from_slice(&body);

// Ensure the flow-control window is large enough.
// TODO: review if/when we add a header to the block.
result.as_mut().increment_read_window(self.block_size as usize);
}
let buffer = buffer.freeze();
DataCacheResult::Ok(Some(buffer.into()))
Expand Down
Loading

0 comments on commit a232279

Please sign in to comment.