Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make cache writes async #1029

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ pub fn main<ClientBuilder, Client, Runtime>(client_builder: ClientBuilder) -> an
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
let args = CliArgs::parse();
let successful_mount_msg = format!(
Expand Down Expand Up @@ -700,7 +700,7 @@ fn mount<ClientBuilder, Client, Runtime>(args: CliArgs, client_builder: ClientBu
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
tracing::info!("mount-s3 {}", build_info::FULL_VERSION);
tracing::debug!("{:?}", args);
Expand Down
6 changes: 4 additions & 2 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod cache_directory;
mod disk_data_cache;
mod in_memory_data_cache;

use async_trait::async_trait;
use thiserror::Error;

pub use crate::checksums::ChecksummedBytes;
Expand Down Expand Up @@ -39,19 +40,20 @@ pub type DataCacheResult<Value> = Result<Value, DataCacheError>;
///
/// TODO: Deletion and eviction of cache entries.
/// TODO: Some version information (ETag) independent from [ObjectId] to allow smarter eviction?
#[async_trait]
pub trait DataCache {
/// Get block of data from the cache for the given [ObjectId] and [BlockIndex], if available.
///
/// Operation may fail due to errors, or return [None] if the block was not available in the cache.
fn get_block(
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
) -> DataCacheResult<Option<ChecksummedBytes>>;

/// Put block of data to the cache for the given [ObjectId] and [BlockIndex].
fn put_block(
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
Expand Down
52 changes: 34 additions & 18 deletions mountpoint-s3/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::os::unix::fs::{DirBuilderExt, OpenOptionsExt};
use std::path::{Path, PathBuf};
use std::time::Instant;

use async_trait::async_trait;
use bytes::Bytes;
use linked_hash_map::LinkedHashMap;
use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c};
Expand Down Expand Up @@ -350,8 +351,9 @@ fn hash_cache_key_raw(cache_key: &ObjectId) -> [u8; 32] {
hasher.finalize().into()
}

#[async_trait]
impl DataCache for DiskDataCache {
fn get_block(
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
Expand Down Expand Up @@ -396,7 +398,7 @@ impl DataCache for DiskDataCache {
}
}

fn put_block(
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
Expand Down Expand Up @@ -531,6 +533,7 @@ mod tests {

use super::*;

use futures::StreamExt as _;
use mountpoint_s3_client::types::ETag;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha20Rng;
Expand Down Expand Up @@ -622,8 +625,8 @@ mod tests {
assert_eq!(expected, results);
}

#[test]
fn test_put_get() {
#[tokio::test]
async fn test_put_get() {
let data_1 = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let data_3 = ChecksummedBytes::new("Baz".into());
Expand All @@ -643,7 +646,10 @@ mod tests {
ETag::for_tests(),
);

let block = cache.get_block(&cache_key_1, 0, 0).expect("cache should be accessible");
let block = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache should be accessible");
assert!(
block.is_none(),
"no entry should be available to return but got {:?}",
Expand All @@ -653,9 +659,11 @@ mod tests {
// PUT and GET, OK?
cache
.put_block(cache_key_1.clone(), 0, 0, data_1.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -666,9 +674,11 @@ mod tests {
// PUT AND GET a second file, OK?
cache
.put_block(cache_key_2.clone(), 0, 0, data_2.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_2, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -679,9 +689,11 @@ mod tests {
// PUT AND GET a second block in a cache entry, OK?
cache
.put_block(cache_key_1.clone(), 1, block_size, data_3.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 1, block_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -692,6 +704,7 @@ mod tests {
// Entry 1's first block still intact
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -700,8 +713,8 @@ mod tests {
);
}

#[test]
fn test_checksummed_bytes_slice() {
#[tokio::test]
async fn test_checksummed_bytes_slice() {
let data = ChecksummedBytes::new("0123456789".into());
let slice = data.slice(1..5);

Expand All @@ -717,9 +730,11 @@ mod tests {

cache
.put_block(cache_key.clone(), 0, 0, slice.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -729,8 +744,8 @@ mod tests {
);
}

#[test]
fn test_eviction() {
#[tokio::test]
async fn test_eviction() {
const BLOCK_SIZE: usize = 100 * 1024;
const LARGE_OBJECT_SIZE: usize = 1024 * 1024;
const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
Expand All @@ -744,14 +759,15 @@ mod tests {
ChecksummedBytes::new(body.into())
}

fn is_block_in_cache(
async fn is_block_in_cache(
cache: &DiskDataCache,
cache_key: &ObjectId,
block_idx: u64,
expected_bytes: &ChecksummedBytes,
) -> bool {
if let Some(retrieved) = cache
.get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64)
.await
.expect("cache should be accessible")
{
assert_eq!(
Expand Down Expand Up @@ -799,6 +815,7 @@ mod tests {
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
)
.await
.unwrap();
}

Expand All @@ -811,25 +828,24 @@ mod tests {
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
)
.await
.unwrap();
}

let count_small_object_blocks_in_cache = small_object_blocks
.iter()
.enumerate()
let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate())
.filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes))
.count();
.count()
.await;
assert_eq!(
count_small_object_blocks_in_cache,
small_object_blocks.len(),
"All blocks for small object should still be in the cache"
);

let count_large_object_blocks_in_cache = large_object_blocks
.iter()
.enumerate()
let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate())
.filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes))
.count();
.count()
.await;
assert!(
count_large_object_blocks_in_cache < large_object_blocks.len(),
"Some blocks for the large object should have been evicted"
Expand Down
20 changes: 15 additions & 5 deletions mountpoint-s3/src/data_cache/in_memory_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use std::collections::HashMap;
use std::default::Default;

use async_trait::async_trait;

use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use crate::object::ObjectId;
use crate::sync::RwLock;
Expand All @@ -29,8 +31,9 @@ impl InMemoryDataCache {
}
}

#[async_trait]
impl DataCache for InMemoryDataCache {
fn get_block(
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
Expand All @@ -44,7 +47,7 @@ impl DataCache for InMemoryDataCache {
Ok(block_data)
}

fn put_block(
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
Expand Down Expand Up @@ -72,8 +75,8 @@ mod tests {
use bytes::Bytes;
use mountpoint_s3_client::types::ETag;

#[test]
fn test_put_get() {
#[tokio::test]
async fn test_put_get() {
let data_1 = Bytes::from_static(b"Hello world");
let data_1 = ChecksummedBytes::new(data_1.clone());
let data_2 = Bytes::from_static(b"Foo bar");
Expand All @@ -86,7 +89,7 @@ mod tests {
let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());

let block = cache.get_block(&cache_key_1, 0, 0).expect("cache is accessible");
let block = cache.get_block(&cache_key_1, 0, 0).await.expect("cache is accessible");
assert!(
block.is_none(),
"no entry should be available to return but got {:?}",
Expand All @@ -96,9 +99,11 @@ mod tests {
// PUT and GET, OK?
cache
.put_block(cache_key_1.clone(), 0, 0, data_1.clone())
.await
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -109,9 +114,11 @@ mod tests {
// PUT AND GET a second file, OK?
cache
.put_block(cache_key_2.clone(), 0, 0, data_2.clone())
.await
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_2, 0, 0)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -122,9 +129,11 @@ mod tests {
// PUT AND GET a second block in a cache entry, OK?
cache
.put_block(cache_key_1.clone(), 1, block_size, data_3.clone())
.await
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_1, 1, block_size)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -135,6 +144,7 @@ mod tests {
// Entry 1's first block still intact
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub fn caching_prefetch<Cache, Runtime>(
) -> CachingPrefetcher<Cache, Runtime>
where
Cache: DataCache + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
let part_stream = CachingPartStream::new(runtime, cache);
Prefetcher::new(part_stream, prefetcher_config)
Expand Down
Loading
Loading