Skip to content

Commit

Permalink
Make data_cache async
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
  • Loading branch information
passaro committed Sep 19, 2024
1 parent 9040066 commit 40d1434
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 47 deletions.
6 changes: 4 additions & 2 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod cache_directory;
mod disk_data_cache;
mod in_memory_data_cache;

use async_trait::async_trait;
use thiserror::Error;

pub use crate::checksums::ChecksummedBytes;
Expand Down Expand Up @@ -39,19 +40,20 @@ pub type DataCacheResult<Value> = Result<Value, DataCacheError>;
///
/// TODO: Deletion and eviction of cache entries.
/// TODO: Some version information (ETag) independent from [ObjectId] to allow smarter eviction?
#[async_trait]
pub trait DataCache {
/// Get block of data from the cache for the given [ObjectId] and [BlockIndex], if available.
///
/// Operation may fail due to errors, or return [None] if the block was not available in the cache.
fn get_block(
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
) -> DataCacheResult<Option<ChecksummedBytes>>;

/// Put block of data to the cache for the given [ObjectId] and [BlockIndex].
fn put_block(
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
Expand Down
52 changes: 34 additions & 18 deletions mountpoint-s3/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::os::unix::fs::{DirBuilderExt, OpenOptionsExt};
use std::path::{Path, PathBuf};
use std::time::Instant;

use async_trait::async_trait;
use bytes::Bytes;
use linked_hash_map::LinkedHashMap;
use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c};
Expand Down Expand Up @@ -350,8 +351,9 @@ fn hash_cache_key_raw(cache_key: &ObjectId) -> [u8; 32] {
hasher.finalize().into()
}

#[async_trait]
impl DataCache for DiskDataCache {
fn get_block(
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
Expand Down Expand Up @@ -396,7 +398,7 @@ impl DataCache for DiskDataCache {
}
}

fn put_block(
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
Expand Down Expand Up @@ -531,6 +533,7 @@ mod tests {

use super::*;

use futures::StreamExt as _;
use mountpoint_s3_client::types::ETag;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha20Rng;
Expand Down Expand Up @@ -622,8 +625,8 @@ mod tests {
assert_eq!(expected, results);
}

#[test]
fn test_put_get() {
#[tokio::test]
async fn test_put_get() {
let data_1 = ChecksummedBytes::new("Foo".into());
let data_2 = ChecksummedBytes::new("Bar".into());
let data_3 = ChecksummedBytes::new("Baz".into());
Expand All @@ -643,7 +646,10 @@ mod tests {
ETag::for_tests(),
);

let block = cache.get_block(&cache_key_1, 0, 0).expect("cache should be accessible");
let block = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache should be accessible");
assert!(
block.is_none(),
"no entry should be available to return but got {:?}",
Expand All @@ -653,9 +659,11 @@ mod tests {
// PUT and GET, OK?
cache
.put_block(cache_key_1.clone(), 0, 0, data_1.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -666,9 +674,11 @@ mod tests {
// PUT AND GET a second file, OK?
cache
.put_block(cache_key_2.clone(), 0, 0, data_2.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_2, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -679,9 +689,11 @@ mod tests {
// PUT AND GET a second block in a cache entry, OK?
cache
.put_block(cache_key_1.clone(), 1, block_size, data_3.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key_1, 1, block_size)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -692,6 +704,7 @@ mod tests {
// Entry 1's first block still intact
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -700,8 +713,8 @@ mod tests {
);
}

#[test]
fn test_checksummed_bytes_slice() {
#[tokio::test]
async fn test_checksummed_bytes_slice() {
let data = ChecksummedBytes::new("0123456789".into());
let slice = data.slice(1..5);

Expand All @@ -717,9 +730,11 @@ mod tests {

cache
.put_block(cache_key.clone(), 0, 0, slice.clone())
.await
.expect("cache should be accessible");
let entry = cache
.get_block(&cache_key, 0, 0)
.await
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -729,8 +744,8 @@ mod tests {
);
}

#[test]
fn test_eviction() {
#[tokio::test]
async fn test_eviction() {
const BLOCK_SIZE: usize = 100 * 1024;
const LARGE_OBJECT_SIZE: usize = 1024 * 1024;
const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
Expand All @@ -744,14 +759,15 @@ mod tests {
ChecksummedBytes::new(body.into())
}

fn is_block_in_cache(
async fn is_block_in_cache(
cache: &DiskDataCache,
cache_key: &ObjectId,
block_idx: u64,
expected_bytes: &ChecksummedBytes,
) -> bool {
if let Some(retrieved) = cache
.get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64)
.await
.expect("cache should be accessible")
{
assert_eq!(
Expand Down Expand Up @@ -799,6 +815,7 @@ mod tests {
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
)
.await
.unwrap();
}

Expand All @@ -811,25 +828,24 @@ mod tests {
(block_idx * BLOCK_SIZE) as u64,
bytes.clone(),
)
.await
.unwrap();
}

let count_small_object_blocks_in_cache = small_object_blocks
.iter()
.enumerate()
let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate())
.filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes))
.count();
.count()
.await;
assert_eq!(
count_small_object_blocks_in_cache,
small_object_blocks.len(),
"All blocks for small object should still be in the cache"
);

let count_large_object_blocks_in_cache = large_object_blocks
.iter()
.enumerate()
let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate())
.filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes))
.count();
.count()
.await;
assert!(
count_large_object_blocks_in_cache < large_object_blocks.len(),
"Some blocks for the large object should have been evicted"
Expand Down
32 changes: 21 additions & 11 deletions mountpoint-s3/src/data_cache/in_memory_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
use std::collections::HashMap;
use std::default::Default;

use async_trait::async_trait;

use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use crate::object::ObjectId;
use crate::sync::RwLock;
use crate::sync::AsyncRwLock;

/// Simple in-memory (RAM) implementation of [DataCache]. Recommended for use in testing only.
pub struct InMemoryDataCache {
data: RwLock<HashMap<ObjectId, HashMap<BlockIndex, ChecksummedBytes>>>,
data: AsyncRwLock<HashMap<ObjectId, HashMap<BlockIndex, ChecksummedBytes>>>,
block_size: u64,
}

Expand All @@ -23,14 +25,15 @@ impl InMemoryDataCache {
}

/// Get number of caching blocks for the given cache key.
pub fn block_count(&self, cache_key: &ObjectId) -> usize {
let data = self.data.read().unwrap();
pub async fn block_count(&self, cache_key: &ObjectId) -> usize {
let data = self.data.read().await;
data.get(cache_key).map_or(0, |cache| cache.len())
}
}

#[async_trait]
impl DataCache for InMemoryDataCache {
fn get_block(
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
Expand All @@ -39,12 +42,12 @@ impl DataCache for InMemoryDataCache {
if block_offset != block_idx * self.block_size {
return Err(DataCacheError::InvalidBlockOffset);
}
let data = self.data.read().unwrap();
let data = self.data.read().await;
let block_data = data.get(cache_key).and_then(|blocks| blocks.get(&block_idx)).cloned();
Ok(block_data)
}

fn put_block(
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
Expand All @@ -54,7 +57,7 @@ impl DataCache for InMemoryDataCache {
if block_offset != block_idx * self.block_size {
return Err(DataCacheError::InvalidBlockOffset);
}
let mut data = self.data.write().unwrap();
let mut data = self.data.write().await;
let blocks = data.entry(cache_key).or_default();
blocks.insert(block_idx, bytes);
Ok(())
Expand All @@ -72,8 +75,8 @@ mod tests {
use bytes::Bytes;
use mountpoint_s3_client::types::ETag;

#[test]
fn test_put_get() {
#[tokio::test]
async fn test_put_get() {
let data_1 = Bytes::from_static(b"Hello world");
let data_1 = ChecksummedBytes::new(data_1.clone());
let data_2 = Bytes::from_static(b"Foo bar");
Expand All @@ -86,7 +89,7 @@ mod tests {
let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests());

let block = cache.get_block(&cache_key_1, 0, 0).expect("cache is accessible");
let block = cache.get_block(&cache_key_1, 0, 0).await.expect("cache is accessible");
assert!(
block.is_none(),
"no entry should be available to return but got {:?}",
Expand All @@ -96,9 +99,11 @@ mod tests {
// PUT and GET, OK?
cache
.put_block(cache_key_1.clone(), 0, 0, data_1.clone())
.await
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -109,9 +114,11 @@ mod tests {
// PUT AND GET a second file, OK?
cache
.put_block(cache_key_2.clone(), 0, 0, data_2.clone())
.await
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_2, 0, 0)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -122,9 +129,11 @@ mod tests {
// PUT AND GET a second block in a cache entry, OK?
cache
.put_block(cache_key_1.clone(), 1, block_size, data_3.clone())
.await
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_1, 1, block_size)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand All @@ -135,6 +144,7 @@ mod tests {
// Entry 1's first block still intact
let entry = cache
.get_block(&cache_key_1, 0, 0)
.await
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
Expand Down
Loading

0 comments on commit 40d1434

Please sign in to comment.