diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index dc6182314..1d4489d80 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -451,7 +451,7 @@ pub fn main(client_builder: ClientBuilder) -> an where ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>, Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn + Send + Sync + 'static, + Runtime: Spawn + Clone + Send + Sync + 'static, { let args = CliArgs::parse(); let successful_mount_msg = format!( @@ -700,7 +700,7 @@ fn mount(args: CliArgs, client_builder: ClientBu where ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>, Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn + Send + Sync + 'static, + Runtime: Spawn + Clone + Send + Sync + 'static, { tracing::info!("mount-s3 {}", build_info::FULL_VERSION); tracing::debug!("{:?}", args); diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index db89afe93..3238e6677 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -137,7 +137,7 @@ pub fn caching_prefetch( ) -> CachingPrefetcher where Cache: DataCache + Send + Sync + 'static, - Runtime: Spawn + Send + Sync + 'static, + Runtime: Spawn + Clone + Send + Sync + 'static, { let part_stream = CachingPartStream::new(runtime, cache); Prefetcher::new(part_stream, prefetcher_config) diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index 6af0a729e..76a4d0847 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -39,7 +39,7 @@ impl CachingPartStream { impl ObjectPartStream for CachingPartStream where Cache: DataCache + Send + Sync + 'static, - Runtime: Spawn, + Runtime: Spawn + Clone + Send + Sync + 'static, { fn spawn_get_object_request( &self, @@ -62,7 +62,13 @@ where trace!(?range, "spawning request"); let request_task = { - let request = CachingRequest::new(client.clone(), self.cache.clone(), backpressure_limiter, config); + let request = CachingRequest::new( + client.clone(), + self.cache.clone(), + self.runtime.clone(), + backpressure_limiter, + config, + ); let span = debug_span!("prefetch", ?range); request.get_from_cache(range, part_queue_producer).instrument(span) }; @@ -74,27 +80,31 @@ where } #[derive(Debug)] -struct CachingRequest { +struct CachingRequest { client: Client, cache: Arc, + runtime: Runtime, backpressure_limiter: BackpressureLimiter, config: RequestTaskConfig, } -impl CachingRequest +impl CachingRequest where Client: ObjectClient + Clone + Send + Sync + 'static, - Cache: DataCache + Send + Sync, + Cache: DataCache + Send + Sync + 'static, + Runtime: Spawn + Clone, { fn new( client: Client, cache: Arc, + runtime: Runtime, backpressure_limiter: BackpressureLimiter, config: RequestTaskConfig, ) -> Self { Self { client, cache, + runtime, backpressure_limiter, config, } @@ -210,8 +220,8 @@ where original_range: range, block_index: block_range.start, block_offset: block_range.start * block_size, - buffer: ChecksummedBytes::default(), cache: self.cache.clone(), + runtime: self.runtime.clone(), }; part_composer.try_compose_parts(request_stream).await; } @@ -228,20 +238,21 @@ where } } -struct CachingPartComposer { +struct CachingPartComposer { part_queue_producer: PartQueueProducer, cache_key: ObjectId, original_range: RequestRange, block_index: u64, block_offset: u64, - buffer: ChecksummedBytes, cache: Arc, + runtime: Runtime, } -impl CachingPartComposer +impl CachingPartComposer where E: std::error::Error + Send + Sync, - Cache: DataCache + Send + Sync, + Cache: DataCache + Send + Sync + 'static, + Runtime: Spawn, { async fn try_compose_parts(&mut self, request_stream: impl Stream>) { if let Err(e) = self.compose_parts(request_stream).await { @@ -257,15 +268,16 @@ where ) -> Result<(), PrefetchReadError> { let key = self.cache_key.key(); let block_size = self.cache.block_size(); + let mut buffer = ChecksummedBytes::default(); pin_mut!(request_stream); while let Some(next) = request_stream.next().await { assert!( - self.buffer.len() < block_size as usize, + buffer.len() < block_size as usize, "buffer should be flushed when we get a full block" ); let (offset, body) = next?; - let expected_offset = self.block_offset + self.buffer.len() as u64; + let expected_offset = self.block_offset + buffer.len() as u64; if offset != expected_offset { warn!(key, offset, expected_offset, "wrong offset for GetObject body part"); return Err(PrefetchReadError::GetRequestReturnedWrongOffset { @@ -278,7 +290,7 @@ where let mut body: Bytes = body.into(); let mut offset = offset; while !body.is_empty() { - let remaining = (block_size as usize).saturating_sub(self.buffer.len()).min(body.len()); + let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); let chunk: ChecksummedBytes = body.split_to(remaining).into(); // We need to return some bytes to the part queue even before we can fill an entire caching block because @@ -297,66 +309,51 @@ where self.part_queue_producer.push(Ok(part)); } offset += chunk.len() as u64; - self.buffer + buffer .extend(chunk) .inspect_err(|e| warn!(key, error=?e, "integrity check for body part failed"))?; - if self.buffer.len() < block_size as usize { + if buffer.len() < block_size as usize { break; } // We have a full block: write it to the cache, send it to the queue, and flush the buffer. - update_cache( - self.cache.as_ref(), - &self.buffer, - self.block_index, - self.block_offset, - &self.cache_key, - ) - .await; + self.update_cache(buffer, self.block_index, self.block_offset, &self.cache_key); self.block_index += 1; self.block_offset += block_size; - self.buffer = ChecksummedBytes::default(); + buffer = ChecksummedBytes::default(); } } - if !self.buffer.is_empty() { + if !buffer.is_empty() { // If we still have data in the buffer, this must be the last block for this object, // which can be smaller than block_size (and ends at the end of the object). assert_eq!( - self.block_offset as usize + self.buffer.len(), + self.block_offset as usize + buffer.len(), self.original_range.object_size(), "a partial block is only allowed at the end of the object" ); // Write the last block to the cache. - update_cache( - self.cache.as_ref(), - &self.buffer, - self.block_index, - self.block_offset, - &self.cache_key, - ) - .await; + self.update_cache(buffer, self.block_index, self.block_offset, &self.cache_key); } Ok(()) } -} -async fn update_cache( - cache: &Cache, - block: &ChecksummedBytes, - block_index: u64, - block_offset: u64, - object_id: &ObjectId, -) { - // TODO: consider updating the cache asynchronously - let start = Instant::now(); - if let Err(error) = cache - .put_block(object_id.clone(), block_index, block_offset, block.clone()) - .await - { - warn!(key=?object_id, block_index, ?error, "failed to update cache"); + fn update_cache(&self, block: ChecksummedBytes, block_index: u64, block_offset: u64, object_id: &ObjectId) { + let object_id = object_id.clone(); + let cache = self.cache.clone(); + self.runtime + .spawn(async move { + let start = Instant::now(); + if let Err(error) = cache + .put_block(object_id.clone(), block_index, block_offset, block) + .await + { + warn!(key=?object_id, block_index, ?error, "failed to update cache"); + } + metrics::histogram!("prefetch.cache_update_duration_us").record(start.elapsed().as_micros() as f64); + }) + .unwrap(); } - metrics::histogram!("prefetch.cache_update_duration_us").record(start.elapsed().as_micros() as f64); } /// Creates a Part that can be streamed to the prefetcher if the given bytes