Skip to content

Commit

Permalink
Make cache writes async (#1029)
Browse files Browse the repository at this point in the history
* Make data_cache async

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Write cache block in background

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Revert unnecessary switch to async lock

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

---------

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
  • Loading branch information
passaro authored Sep 24, 2024
1 parent ed8d96b commit d62413d
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 78 deletions.
4 changes: 2 additions & 2 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ pub fn main<ClientBuilder, Client, Runtime>(client_builder: ClientBuilder) -> an
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
let args = CliArgs::parse();
let successful_mount_msg = format!(
Expand Down Expand Up @@ -700,7 +700,7 @@ fn mount<ClientBuilder, Client, Runtime>(args: CliArgs, client_builder: ClientBu
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
tracing::info!("mount-s3 {}", build_info::FULL_VERSION);
tracing::debug!("{:?}", args);
Expand Down
6 changes: 4 additions & 2 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod cache_directory;
mod disk_data_cache;
mod in_memory_data_cache;

use async_trait::async_trait;
use thiserror::Error;

pub use crate::checksums::ChecksummedBytes;
Expand Down Expand Up @@ -39,19 +40,20 @@ pub type DataCacheResult<Value> = Result<Value, DataCacheError>;
///
/// TODO: Deletion and eviction of cache entries.
/// TODO: Some version information (ETag) independent from [ObjectId] to allow smarter eviction?
#[async_trait]
pub trait DataCache {
/// Get block of data from the cache for the given [ObjectId] and [BlockIndex], if available.
///
/// Operation may fail due to errors, or return [None] if the block was not available in the cache.
fn get_block(
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
) -> DataCacheResult<Option<ChecksummedBytes>>;

/// Put block of data to the cache for the given [ObjectId] and [BlockIndex].
fn put_block(
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
Expand Down
52 changes: 34 additions & 18 deletions mountpoint-s3/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::os::unix::fs::{DirBuilderExt, OpenOptionsExt};
use std::path::{Path, PathBuf};
use std::time::Instant;

use async_trait::async_trait;
use bytes::Bytes;
use linked_hash_map::LinkedHashMap;
use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c};
Expand Down Expand Up @@ -350,8 +351,9 @@ fn hash_cache_key_raw(cache_key: &ObjectId) -> [u8; 32] {
hasher.finalize().into()
}

#[async_trait]
impl DataCache for DiskDataCache {
fn get_block(
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
Expand Down Expand Up @@ -396,7 +398,7 @@ impl DataCache for DiskDataCache {
}
}

fn put_block(
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
Expand Down Expand Up @@ -531,6 +533,7 @@ mod tests {

use super::*;

use futures::StreamExt as _;
use mountpoint_s3_client::types::ETag;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha20Rng;
Expand Down Expand Up @@ -622,8 +625,8 @@ mod tests {
assert_eq!(expected, results);
}

#[test]
fn test_put_get() {
#[tokio::test]
async fn test_put_get() {
let data_1 = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let data_3 = ChecksummedBytes::new("Baz".into());
Expand All @@ -643,7 +646,10 @@ mod tests {
ETag::for_tests(),
);

let block = cache.get_block(&cache_key_1, 0, 0).expect("cache should be accessible");
let block = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache should be accessible");
assert!(
block.is_none(),
"no entry should be available to return but got {:?}",
Expand All @@ -653,9 +659,11 @@ mod tests {
// PUT and GET, OK?
cache
.put_block(cache_key_1.clone(), 0, 0, data_1.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -666,9 +674,11 @@ mod tests {
// PUT AND GET a second file, OK?
cache
.put_block(cache_key_2.clone(), 0, 0, data_2.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_2, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -679,9 +689,11 @@ mod tests {
// PUT AND GET a second block in a cache entry, OK?
cache
.put_block(cache_key_1.clone(), 1, block_size, data_3.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 1, block_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -692,6 +704,7 @@ mod tests {
// Entry 1's first block still intact
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -700,8 +713,8 @@ mod tests {
);
}

#[test]
fn test_checksummed_bytes_slice() {
#[tokio::test]
async fn test_checksummed_bytes_slice() {
let data = ChecksummedBytes::new("0123456789".into());
let slice = data.slice(1..5);

Expand All @@ -717,9 +730,11 @@ mod tests {

cache
.put_block(cache_key.clone(), 0, 0, slice.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -729,8 +744,8 @@ mod tests {
);
}

#[test]
fn test_eviction() {
#[tokio::test]
async fn test_eviction() {
const BLOCK_SIZE: usize = 100 * 1024;
const LARGE_OBJECT_SIZE: usize = 1024 * 1024;
const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
Expand All @@ -744,14 +759,15 @@ mod tests {
ChecksummedBytes::new(body.into())
}

fn is_block_in_cache(
async fn is_block_in_cache(
cache: &DiskDataCache,
cache_key: &ObjectId,
block_idx: u64,
expected_bytes: &ChecksummedBytes,
) -> bool {
if let Some(retrieved) = cache
.get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64)
.await
.expect("cache should be accessible")
{
assert_eq!(
Expand Down Expand Up @@ -799,6 +815,7 @@ mod tests {
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
)
.await
.unwrap();
}

Expand All @@ -811,25 +828,24 @@ mod tests {
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
)
.await
.unwrap();
}

let count_small_object_blocks_in_cache = small_object_blocks
.iter()
.enumerate()
let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate())
.filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes))
.count();
.count()
.await;
assert_eq!(
count_small_object_blocks_in_cache,
small_object_blocks.len(),
"All blocks for small object should still be in the cache"
);

let count_large_object_blocks_in_cache = large_object_blocks
.iter()
.enumerate()
let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate())
.filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes))
.count();
.count()
.await;
assert!(
count_large_object_blocks_in_cache < large_object_blocks.len(),
"Some blocks for the large object should have been evicted"
Expand Down
20 changes: 15 additions & 5 deletions mountpoint-s3/src/data_cache/in_memory_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use std::collections::HashMap;
use std::default::Default;

use async_trait::async_trait;

use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use crate::object::ObjectId;
use crate::sync::RwLock;
Expand All @@ -29,8 +31,9 @@ impl InMemoryDataCache {
}
}

#[async_trait]
impl DataCache for InMemoryDataCache {
fn get_block(
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
Expand All @@ -44,7 +47,7 @@ impl DataCache for InMemoryDataCache {
Ok(block_data)
}

fn put_block(
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
Expand Down Expand Up @@ -72,8 +75,8 @@ mod tests {
use bytes::Bytes;
use mountpoint_s3_client::types::ETag;

#[test]
fn test_put_get() {
#[tokio::test]
async fn test_put_get() {
let data_1 = Bytes::from_static(b"Hello world");
let data_1 = ChecksummedBytes::new(data_1.clone());
let data_2 = Bytes::from_static(b"Foo bar");
Expand All @@ -86,7 +89,7 @@ mod tests {
let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());

let block = cache.get_block(&cache_key_1, 0, 0).expect("cache is accessible");
let block = cache.get_block(&cache_key_1, 0, 0).await.expect("cache is accessible");
assert!(
block.is_none(),
"no entry should be available to return but got {:?}",
Expand All @@ -96,9 +99,11 @@ mod tests {
// PUT and GET, OK?
cache
.put_block(cache_key_1.clone(), 0, 0, data_1.clone())
.await
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -109,9 +114,11 @@ mod tests {
// PUT AND GET a second file, OK?
cache
.put_block(cache_key_2.clone(), 0, 0, data_2.clone())
.await
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_2, 0, 0)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -122,9 +129,11 @@ mod tests {
// PUT AND GET a second block in a cache entry, OK?
cache
.put_block(cache_key_1.clone(), 1, block_size, data_3.clone())
.await
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_1, 1, block_size)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -135,6 +144,7 @@ mod tests {
// Entry 1's first block still intact
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub fn caching_prefetch<Cache, Runtime>(
) -> CachingPrefetcher<Cache, Runtime>
where
Cache: DataCache + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
let part_stream = CachingPartStream::new(runtime, cache);
Prefetcher::new(part_stream, prefetcher_config)
Expand Down
Loading

0 comments on commit d62413d

Please sign in to comment.