Skip to content

Commit

Permalink
Introduce CacheTestWrapper, use it to check cache reads and writes
Browse files Browse the repository at this point in the history
Signed-off-by: Vlad Volodkin <vlaad@amazon.com>
  • Loading branch information
Vlad Volodkin committed Nov 14, 2024
1 parent 7a8c9b5 commit 92021e6
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 38 deletions.
118 changes: 118 additions & 0 deletions mountpoint-s3/tests/common/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use async_trait::async_trait;
use mountpoint_s3::{
data_cache::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult},
object::ObjectId,
};

/// A wrapper around any type implementing [DataCache], which counts operations
pub struct CacheTestWrapper<Cache> {
cache: Arc<Cache>,
get_block_ok_count: Arc<AtomicU64>,
get_block_hit_count: Arc<AtomicU64>,
get_block_failed_count: Arc<AtomicU64>,
put_block_ok_count: Arc<AtomicU64>,
put_block_failed_count: Arc<AtomicU64>,
}

impl<Cache> Clone for CacheTestWrapper<Cache> {
fn clone(&self) -> Self {
Self {
cache: self.cache.clone(),
get_block_ok_count: self.get_block_ok_count.clone(),
get_block_hit_count: self.get_block_hit_count.clone(),
get_block_failed_count: self.get_block_failed_count.clone(),
put_block_ok_count: self.put_block_ok_count.clone(),
put_block_failed_count: self.put_block_failed_count.clone(),
}
}
}

impl<Cache> CacheTestWrapper<Cache> {
pub fn new(cache: Arc<Cache>) -> Self {
CacheTestWrapper {
cache,
get_block_ok_count: Arc::new(AtomicU64::new(0)),
get_block_hit_count: Arc::new(AtomicU64::new(0)),
get_block_failed_count: Arc::new(AtomicU64::new(0)),
put_block_ok_count: Arc::new(AtomicU64::new(0)),
put_block_failed_count: Arc::new(AtomicU64::new(0)),
}
}

pub fn wait_for_put(&self, max_wait_duration: Duration) {
let st = std::time::Instant::now();
loop {
if st.elapsed() > max_wait_duration {
panic!("timeout on waiting for a write to the cache to happen")
}
if self.put_block_failed_count.load(Ordering::SeqCst) > 0
|| self.put_block_ok_count.load(Ordering::SeqCst) > 0
{
break;
}
std::thread::sleep(Duration::from_millis(100));
}
}

pub fn get_block_hit_count(&self) -> u64 {
self.get_block_hit_count.load(Ordering::SeqCst)
}
}

#[async_trait]
impl<Cache: DataCache + Send + Sync + 'static> DataCache for CacheTestWrapper<Cache> {
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let result = self
.cache
.get_block(cache_key, block_idx, block_offset, object_size)
.await
.inspect(|_| {
self.get_block_ok_count.fetch_add(1, Ordering::SeqCst);
})
.inspect_err(|_| {
self.get_block_failed_count.fetch_add(1, Ordering::SeqCst);
})?
.inspect(|_| {
self.get_block_hit_count.fetch_add(1, Ordering::SeqCst);
});

Ok(result)
}

async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()> {
self.cache
.put_block(cache_key, block_idx, block_offset, bytes, object_size)
.await
.inspect(|_| {
self.put_block_ok_count.fetch_add(1, Ordering::SeqCst);
})
.inspect_err(|_| {
self.put_block_failed_count.fetch_add(1, Ordering::SeqCst);
})
}

fn block_size(&self) -> u64 {
self.cache.block_size()
}
}
2 changes: 1 addition & 1 deletion mountpoint-s3/tests/common/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ pub mod s3_session {
S3CrtClient::new(client_config).unwrap()
}

pub fn create_test_client(region: &str, bucket: &str, prefix: &str) -> impl TestClient {
fn create_test_client(region: &str, bucket: &str, prefix: &str) -> impl TestClient {
let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await });
SDKTestClient {
prefix: prefix.to_owned(),
Expand Down
2 changes: 2 additions & 0 deletions mountpoint-s3/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//! Allow for unused items since this is included independently in each module.
#![allow(dead_code)]

pub mod cache;

pub mod creds;

#[cfg(feature = "fuse_tests")]
Expand Down
56 changes: 19 additions & 37 deletions mountpoint-s3/tests/fuse_tests/cache_test.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use crate::common::fuse::s3_session::{create_crt_client, create_test_client};
use crate::common::fuse::{create_fuse_session, TestClient};
use crate::common::s3::{get_express_bucket, get_standard_bucket, get_test_prefix, get_test_region};
use crate::common::cache::CacheTestWrapper;
use crate::common::fuse::create_fuse_session;
use crate::common::fuse::s3_session::create_crt_client;
use crate::common::s3::{get_express_bucket, get_standard_bucket, get_test_bucket_and_prefix};
use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig, ExpressDataCache};
use mountpoint_s3::fs::CacheConfig;
use mountpoint_s3::prefetch::caching_prefetch;
use mountpoint_s3::S3FilesystemConfig;
use mountpoint_s3_client::S3CrtClient;
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use std::fs;
use std::thread::sleep;
use std::sync::Arc;
use std::time::Duration;
use test_case::test_case;

Expand All @@ -24,7 +23,7 @@ fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usiz
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let bucket_name = get_standard_bucket();
let express_bucket_name = get_express_bucket();
let cache = ExpressDataCache::new(&express_bucket_name, client.clone(), &bucket_name, CACHE_BLOCK_SIZE);
let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &express_bucket_name);

cache_write_read_base(
client,
Expand Down Expand Up @@ -70,30 +69,20 @@ fn cache_write_read_base<Cache>(
) where
Cache: DataCache + Send + Sync + 'static,
{
let region = get_test_region();
let bucket = get_standard_bucket();
let prefix = get_test_prefix(test_name);
let (bucket, prefix) = get_test_bucket_and_prefix(test_name);

// Mount a bucket
let filesystem_config = S3FilesystemConfig {
cache_config: CacheConfig {
serve_lookup_from_cache: true,
file_ttl: Duration::from_secs(3600),
dir_ttl: Duration::from_secs(3600),
..Default::default()
},
..Default::default()
};
let mount_point = tempfile::tempdir().unwrap();
let runtime = client.event_loop_group();
let prefetcher = caching_prefetch(cache, runtime, Default::default());
let cache = CacheTestWrapper::new(Arc::new(cache));
let prefetcher = caching_prefetch(cache.clone(), runtime, Default::default());
let _session = create_fuse_session(
client,
prefetcher,
&bucket,
&prefix,
mount_point.path(),
filesystem_config,
Default::default(),
);

// Write an object, no caching happens yet
Expand All @@ -106,27 +95,20 @@ fn cache_write_read_base<Cache>(
let read = fs::read(&path).expect("read should succeed");
assert_eq!(read, written);

// Cache population is async, 3 seconds should be enough for it to finish
sleep(Duration::from_secs(3));

// Ensure data may not be served from the source bucket.
//
// NOTE, that we assume that the metadata cache will hold an entry for the removed object
// at the point of the next read. This assumption must be valid since there are no other
// FS operations done before the read. Currently, an entry in the metadata cache may be
// invalidated by TTL expiry or a READDIR(PLUS) call.
let test_client = create_test_client(&region, &bucket, &prefix);
test_client.remove_object(&key).expect("remove must succeed");
assert!(
!test_client.contains_key(&key).expect("head object must succeed"),
"object should not exist in the source bucket"
);
// Cache population is async, wait for it to happen
cache.wait_for_put(Duration::from_secs(10));

// Second read should be from the cache
let cache_hits_before_read = cache.get_block_hit_count();
let read = fs::read(&path).expect("read from the cache should succeed");
assert_eq!(read, written);
assert!(
cache.get_block_hit_count() > cache_hits_before_read,
"read should result in a cache hit"
);
}

/// Generates random data of the specified size
fn random_binary_data(size_in_bytes: usize) -> Vec<u8> {
let seed = rand::thread_rng().gen();
let mut rng = ChaChaRng::seed_from_u64(seed);
Expand All @@ -135,7 +117,7 @@ fn random_binary_data(size_in_bytes: usize) -> Vec<u8> {
data
}

// Creates a random key which has a size of at least `min_size_in_bytes`
/// Creates a random key which has a size of at least `min_size_in_bytes`
fn get_object_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize) -> String {
let random_suffix: u64 = rand::thread_rng().gen();
let last_key_part = format!("{key_suffix}{random_suffix}"); // part of the key after all the "/"
Expand Down

0 comments on commit 92021e6

Please sign in to comment.