diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index dc6182314..1d4489d80 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -451,7 +451,7 @@ pub fn main(client_builder: ClientBuilder) -> an where ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>, Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn + Send + Sync + 'static, + Runtime: Spawn + Clone + Send + Sync + 'static, { let args = CliArgs::parse(); let successful_mount_msg = format!( @@ -700,7 +700,7 @@ fn mount(args: CliArgs, client_builder: ClientBu where ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>, Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn + Send + Sync + 'static, + Runtime: Spawn + Clone + Send + Sync + 'static, { tracing::info!("mount-s3 {}", build_info::FULL_VERSION); tracing::debug!("{:?}", args); diff --git a/mountpoint-s3/src/data_cache.rs b/mountpoint-s3/src/data_cache.rs index c762f1998..e3685e0ba 100644 --- a/mountpoint-s3/src/data_cache.rs +++ b/mountpoint-s3/src/data_cache.rs @@ -8,6 +8,7 @@ mod cache_directory; mod disk_data_cache; mod in_memory_data_cache; +use async_trait::async_trait; use thiserror::Error; pub use crate::checksums::ChecksummedBytes; @@ -39,11 +40,12 @@ pub type DataCacheResult = Result; /// /// TODO: Deletion and eviction of cache entries. /// TODO: Some version information (ETag) independent from [ObjectId] to allow smarter eviction? +#[async_trait] pub trait DataCache { /// Get block of data from the cache for the given [ObjectId] and [BlockIndex], if available. /// /// Operation may fail due to errors, or return [None] if the block was not available in the cache. - fn get_block( + async fn get_block( &self, cache_key: &ObjectId, block_idx: BlockIndex, @@ -51,7 +53,7 @@ pub trait DataCache { ) -> DataCacheResult>; /// Put block of data to the cache for the given [ObjectId] and [BlockIndex]. - fn put_block( + async fn put_block( &self, cache_key: ObjectId, block_idx: BlockIndex, diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 89eb1bc2f..cca4406a4 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -6,6 +6,7 @@ use std::os::unix::fs::{DirBuilderExt, OpenOptionsExt}; use std::path::{Path, PathBuf}; use std::time::Instant; +use async_trait::async_trait; use bytes::Bytes; use linked_hash_map::LinkedHashMap; use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c}; @@ -350,8 +351,9 @@ fn hash_cache_key_raw(cache_key: &ObjectId) -> [u8; 32] { hasher.finalize().into() } +#[async_trait] impl DataCache for DiskDataCache { - fn get_block( + async fn get_block( &self, cache_key: &ObjectId, block_idx: BlockIndex, @@ -396,7 +398,7 @@ impl DataCache for DiskDataCache { } } - fn put_block( + async fn put_block( &self, cache_key: ObjectId, block_idx: BlockIndex, @@ -531,6 +533,7 @@ mod tests { use super::*; + use futures::StreamExt as _; use mountpoint_s3_client::types::ETag; use rand::{Rng, SeedableRng}; use rand_chacha::ChaCha20Rng; @@ -622,8 +625,8 @@ mod tests { assert_eq!(expected, results); } - #[test] - fn test_put_get() { + #[tokio::test] + async fn test_put_get() { let data_1 = ChecksummedBytes::new("Foo".into()); let data_2 = ChecksummedBytes::new("Bar".into()); let data_3 = ChecksummedBytes::new("Baz".into()); @@ -643,7 +646,10 @@ mod tests { ETag::for_tests(), ); - let block = cache.get_block(&cache_key_1, 0, 0).expect("cache should be accessible"); + let block = cache + .get_block(&cache_key_1, 0, 0) + .await + .expect("cache should be accessible"); assert!( block.is_none(), "no entry should be available to return but got {:?}", @@ -653,9 +659,11 @@ mod tests { // PUT and GET, OK? cache .put_block(cache_key_1.clone(), 0, 0, data_1.clone()) + .await .expect("cache should be accessible"); let entry = cache .get_block(&cache_key_1, 0, 0) + .await .expect("cache should be accessible") .expect("cache entry should be returned"); assert_eq!( @@ -666,9 +674,11 @@ mod tests { // PUT AND GET a second file, OK? cache .put_block(cache_key_2.clone(), 0, 0, data_2.clone()) + .await .expect("cache should be accessible"); let entry = cache .get_block(&cache_key_2, 0, 0) + .await .expect("cache should be accessible") .expect("cache entry should be returned"); assert_eq!( @@ -679,9 +689,11 @@ mod tests { // PUT AND GET a second block in a cache entry, OK? cache .put_block(cache_key_1.clone(), 1, block_size, data_3.clone()) + .await .expect("cache should be accessible"); let entry = cache .get_block(&cache_key_1, 1, block_size) + .await .expect("cache should be accessible") .expect("cache entry should be returned"); assert_eq!( @@ -692,6 +704,7 @@ mod tests { // Entry 1's first block still intact let entry = cache .get_block(&cache_key_1, 0, 0) + .await .expect("cache should be accessible") .expect("cache entry should be returned"); assert_eq!( @@ -700,8 +713,8 @@ mod tests { ); } - #[test] - fn test_checksummed_bytes_slice() { + #[tokio::test] + async fn test_checksummed_bytes_slice() { let data = ChecksummedBytes::new("0123456789".into()); let slice = data.slice(1..5); @@ -717,9 +730,11 @@ mod tests { cache .put_block(cache_key.clone(), 0, 0, slice.clone()) + .await .expect("cache should be accessible"); let entry = cache .get_block(&cache_key, 0, 0) + .await .expect("cache should be accessible") .expect("cache entry should be returned"); assert_eq!( @@ -729,8 +744,8 @@ mod tests { ); } - #[test] - fn test_eviction() { + #[tokio::test] + async fn test_eviction() { const BLOCK_SIZE: usize = 100 * 1024; const LARGE_OBJECT_SIZE: usize = 1024 * 1024; const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2; @@ -744,7 +759,7 @@ mod tests { ChecksummedBytes::new(body.into()) } - fn is_block_in_cache( + async fn is_block_in_cache( cache: &DiskDataCache, cache_key: &ObjectId, block_idx: u64, @@ -752,6 +767,7 @@ mod tests { ) -> bool { if let Some(retrieved) = cache .get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64) + .await .expect("cache should be accessible") { assert_eq!( @@ -799,6 +815,7 @@ mod tests { (block_idx * BLOCK_SIZE) as u64, bytes.clone(), ) + .await .unwrap(); } @@ -811,25 +828,24 @@ mod tests { (block_idx * BLOCK_SIZE) as u64, bytes.clone(), ) + .await .unwrap(); } - let count_small_object_blocks_in_cache = small_object_blocks - .iter() - .enumerate() + let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate()) .filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes)) - .count(); + .count() + .await; assert_eq!( count_small_object_blocks_in_cache, small_object_blocks.len(), "All blocks for small object should still be in the cache" ); - let count_large_object_blocks_in_cache = large_object_blocks - .iter() - .enumerate() + let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate()) .filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes)) - .count(); + .count() + .await; assert!( count_large_object_blocks_in_cache < large_object_blocks.len(), "Some blocks for the large object should have been evicted" diff --git a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs index 47db7f735..18c6335c3 100644 --- a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs +++ b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use std::default::Default; +use async_trait::async_trait; + use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult}; use crate::object::ObjectId; use crate::sync::RwLock; @@ -29,8 +31,9 @@ impl InMemoryDataCache { } } +#[async_trait] impl DataCache for InMemoryDataCache { - fn get_block( + async fn get_block( &self, cache_key: &ObjectId, block_idx: BlockIndex, @@ -44,7 +47,7 @@ impl DataCache for InMemoryDataCache { Ok(block_data) } - fn put_block( + async fn put_block( &self, cache_key: ObjectId, block_idx: BlockIndex, @@ -72,8 +75,8 @@ mod tests { use bytes::Bytes; use mountpoint_s3_client::types::ETag; - #[test] - fn test_put_get() { + #[tokio::test] + async fn test_put_get() { let data_1 = Bytes::from_static(b"Hello world"); let data_1 = ChecksummedBytes::new(data_1.clone()); let data_2 = Bytes::from_static(b"Foo bar"); @@ -86,7 +89,7 @@ mod tests { let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests()); let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests()); - let block = cache.get_block(&cache_key_1, 0, 0).expect("cache is accessible"); + let block = cache.get_block(&cache_key_1, 0, 0).await.expect("cache is accessible"); assert!( block.is_none(), "no entry should be available to return but got {:?}", @@ -96,9 +99,11 @@ mod tests { // PUT and GET, OK? cache .put_block(cache_key_1.clone(), 0, 0, data_1.clone()) + .await .expect("cache is accessible"); let entry = cache .get_block(&cache_key_1, 0, 0) + .await .expect("cache is accessible") .expect("cache entry should be returned"); assert_eq!( @@ -109,9 +114,11 @@ mod tests { // PUT AND GET a second file, OK? cache .put_block(cache_key_2.clone(), 0, 0, data_2.clone()) + .await .expect("cache is accessible"); let entry = cache .get_block(&cache_key_2, 0, 0) + .await .expect("cache is accessible") .expect("cache entry should be returned"); assert_eq!( @@ -122,9 +129,11 @@ mod tests { // PUT AND GET a second block in a cache entry, OK? cache .put_block(cache_key_1.clone(), 1, block_size, data_3.clone()) + .await .expect("cache is accessible"); let entry = cache .get_block(&cache_key_1, 1, block_size) + .await .expect("cache is accessible") .expect("cache entry should be returned"); assert_eq!( @@ -135,6 +144,7 @@ mod tests { // Entry 1's first block still intact let entry = cache .get_block(&cache_key_1, 0, 0) + .await .expect("cache is accessible") .expect("cache entry should be returned"); assert_eq!( diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index db89afe93..3238e6677 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -137,7 +137,7 @@ pub fn caching_prefetch( ) -> CachingPrefetcher where Cache: DataCache + Send + Sync + 'static, - Runtime: Spawn + Send + Sync + 'static, + Runtime: Spawn + Clone + Send + Sync + 'static, { let part_stream = CachingPartStream::new(runtime, cache); Prefetcher::new(part_stream, prefetcher_config) diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index a42121417..38caf3c4a 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -39,7 +39,7 @@ impl CachingPartStream { impl ObjectPartStream for CachingPartStream where Cache: DataCache + Send + Sync + 'static, - Runtime: Spawn, + Runtime: Spawn + Clone + Send + Sync + 'static, { fn spawn_get_object_request( &self, @@ -62,7 +62,13 @@ where trace!(?range, "spawning request"); let request_task = { - let request = CachingRequest::new(client.clone(), self.cache.clone(), backpressure_limiter, config); + let request = CachingRequest::new( + client.clone(), + self.cache.clone(), + self.runtime.clone(), + backpressure_limiter, + config, + ); let span = debug_span!("prefetch", ?range); request.get_from_cache(range, part_queue_producer).instrument(span) }; @@ -74,27 +80,31 @@ where } #[derive(Debug)] -struct CachingRequest { +struct CachingRequest { client: Client, cache: Arc, + runtime: Runtime, backpressure_limiter: BackpressureLimiter, config: RequestTaskConfig, } -impl CachingRequest +impl CachingRequest where Client: ObjectClient + Clone + Send + Sync + 'static, - Cache: DataCache + Send + Sync, + Cache: DataCache + Send + Sync + 'static, + Runtime: Spawn + Clone, { fn new( client: Client, cache: Arc, + runtime: Runtime, backpressure_limiter: BackpressureLimiter, config: RequestTaskConfig, ) -> Self { Self { client, cache, + runtime, backpressure_limiter, config, } @@ -128,7 +138,7 @@ where // already likely negligible. let mut block_offset = block_range.start * block_size; for block_index in block_range.clone() { - match self.cache.get_block(cache_key, block_index, block_offset) { + match self.cache.get_block(cache_key, block_index, block_offset).await { Ok(Some(block)) => { trace!(?cache_key, ?range, block_index, "cache hit"); // Cache blocks always contain bytes in the request range @@ -210,8 +220,8 @@ where original_range: range, block_index: block_range.start, block_offset: block_range.start * block_size, - buffer: ChecksummedBytes::default(), cache: self.cache.clone(), + runtime: self.runtime.clone(), }; part_composer.try_compose_parts(request_stream).await; } @@ -228,20 +238,21 @@ where } } -struct CachingPartComposer { +struct CachingPartComposer { part_queue_producer: PartQueueProducer, cache_key: ObjectId, original_range: RequestRange, block_index: u64, block_offset: u64, - buffer: ChecksummedBytes, cache: Arc, + runtime: Runtime, } -impl CachingPartComposer +impl CachingPartComposer where E: std::error::Error + Send + Sync, - Cache: DataCache + Send + Sync, + Cache: DataCache + Send + Sync + 'static, + Runtime: Spawn, { async fn try_compose_parts(&mut self, request_stream: impl Stream>) { if let Err(e) = self.compose_parts(request_stream).await { @@ -257,15 +268,16 @@ where ) -> Result<(), PrefetchReadError> { let key = self.cache_key.key(); let block_size = self.cache.block_size(); + let mut buffer = ChecksummedBytes::default(); pin_mut!(request_stream); while let Some(next) = request_stream.next().await { assert!( - self.buffer.len() < block_size as usize, + buffer.len() < block_size as usize, "buffer should be flushed when we get a full block" ); let (offset, body) = next?; - let expected_offset = self.block_offset + self.buffer.len() as u64; + let expected_offset = self.block_offset + buffer.len() as u64; if offset != expected_offset { warn!(key, offset, expected_offset, "wrong offset for GetObject body part"); return Err(PrefetchReadError::GetRequestReturnedWrongOffset { @@ -278,7 +290,7 @@ where let mut body: Bytes = body.into(); let mut offset = offset; while !body.is_empty() { - let remaining = (block_size as usize).saturating_sub(self.buffer.len()).min(body.len()); + let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); let chunk: ChecksummedBytes = body.split_to(remaining).into(); // We need to return some bytes to the part queue even before we can fill an entire caching block because @@ -297,64 +309,51 @@ where self.part_queue_producer.push(Ok(part)); } offset += chunk.len() as u64; - self.buffer + buffer .extend(chunk) .inspect_err(|e| warn!(key, error=?e, "integrity check for body part failed"))?; - if self.buffer.len() < block_size as usize { + if buffer.len() < block_size as usize { break; } // We have a full block: write it to the cache, send it to the queue, and flush the buffer. - update_cache( - self.cache.as_ref(), - &self.buffer, - self.block_index, - self.block_offset, - &self.cache_key, - ); + self.update_cache(buffer, self.block_index, self.block_offset, &self.cache_key); self.block_index += 1; self.block_offset += block_size; - self.buffer = ChecksummedBytes::default(); + buffer = ChecksummedBytes::default(); } } - if !self.buffer.is_empty() { + if !buffer.is_empty() { // If we still have data in the buffer, this must be the last block for this object, // which can be smaller than block_size (and ends at the end of the object). assert_eq!( - self.block_offset as usize + self.buffer.len(), + self.block_offset as usize + buffer.len(), self.original_range.object_size(), "a partial block is only allowed at the end of the object" ); // Write the last block to the cache. - update_cache( - self.cache.as_ref(), - &self.buffer, - self.block_index, - self.block_offset, - &self.cache_key, - ); + self.update_cache(buffer, self.block_index, self.block_offset, &self.cache_key); } Ok(()) } -} -fn update_cache( - cache: &Cache, - block: &ChecksummedBytes, - block_index: u64, - block_offset: u64, - object_id: &ObjectId, -) { - // TODO: consider updating the cache asynchronously - let start = Instant::now(); - match cache.put_block(object_id.clone(), block_index, block_offset, block.clone()) { - Ok(()) => {} - Err(error) => { - warn!(key=?object_id, block_index, ?error, "failed to update cache"); - } - }; - metrics::histogram!("prefetch.cache_update_duration_us").record(start.elapsed().as_micros() as f64); + fn update_cache(&self, block: ChecksummedBytes, block_index: u64, block_offset: u64, object_id: &ObjectId) { + let object_id = object_id.clone(); + let cache = self.cache.clone(); + self.runtime + .spawn(async move { + let start = Instant::now(); + if let Err(error) = cache + .put_block(object_id.clone(), block_index, block_offset, block) + .await + { + warn!(key=?object_id, block_index, ?error, "failed to update cache"); + } + metrics::histogram!("prefetch.cache_update_duration_us").record(start.elapsed().as_micros() as f64); + }) + .unwrap(); + } } /// Creates a Part that can be streamed to the prefetcher if the given bytes