From 8fad2e50040b158d406d6d6e1b5484d0607045b5 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Fri, 27 Sep 2024 09:45:31 +0000 Subject: [PATCH] Fix scaling logic and address comments Signed-off-by: Monthon Klongklaew --- mountpoint-s3-client/src/s3_crt_client.rs | 4 +- mountpoint-s3/examples/prefetch_benchmark.rs | 19 +++- mountpoint-s3/src/cli.rs | 2 +- mountpoint-s3/src/fs.rs | 4 +- mountpoint-s3/src/mem_limiter.rs | 2 + mountpoint-s3/src/prefetch.rs | 23 ++--- .../src/prefetch/backpressure_controller.rs | 93 ++++++++++++++++++- mountpoint-s3/src/prefetch/caching_stream.rs | 18 ++-- mountpoint-s3/src/prefetch/part_queue.rs | 25 +++-- mountpoint-s3/src/prefetch/part_stream.rs | 4 +- mountpoint-s3/src/prefetch/task.rs | 12 +-- 11 files changed, 156 insertions(+), 50 deletions(-) diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index ec7bb74c5..d38742edf 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -769,7 +769,7 @@ impl S3CrtClientInner { // Buffer pool metrics let start = Instant::now(); let buffer_pool_stats = s3_client.poll_buffer_pool_usage_stats(); - metrics::histogram!("s3.client.buffer_pool.get_usage_latecy_us").record(start.elapsed().as_micros() as f64); + metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64); metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64); metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64); metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64); @@ -1213,7 +1213,7 @@ impl ObjectClient for S3CrtClient { fn mem_usage_stats(&self) -> Option { let start = Instant::now(); let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats(); - metrics::histogram!("s3.client.buffer_pool.get_usage_latecy_us").record(start.elapsed().as_micros() as f64); + metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64); Some(crt_buffer_pool_stats) } diff --git a/mountpoint-s3/examples/prefetch_benchmark.rs b/mountpoint-s3/examples/prefetch_benchmark.rs index 7532edde4..f9db56d36 100644 --- a/mountpoint-s3/examples/prefetch_benchmark.rs +++ b/mountpoint-s3/examples/prefetch_benchmark.rs @@ -13,6 +13,7 @@ use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::{ObjectClient, S3CrtClient}; use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter; +use sysinfo::{RefreshKind, System}; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; @@ -41,6 +42,11 @@ fn main() { .long("throughput-target-gbps") .help("Desired throughput in Gbps"), ) + .arg( + Arg::new("max-memory-target") + .long("max-memory-target") + .help("Maximum memory usage target in MiB"), + ) .arg( Arg::new("part-size") .long("part-size") @@ -65,6 +71,9 @@ fn main() { let throughput_target_gbps = matches .get_one::("throughput-target-gbps") .map(|s| s.parse::().expect("throughput target must be an f64")); + let max_memory_target = matches + .get_one::("max-memory-target") + .map(|s| s.parse::().expect("throughput target must be a u64")); let part_size = matches .get_one::("part-size") .map(|s| s.parse::().expect("part size must be a usize")); @@ -93,7 +102,15 @@ fn main() { config = config.part_size(part_size); } let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client")); - let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024)); + + let max_memory_target = if let Some(target) = max_memory_target { + target * 1024 * 1024 + } else { + // Default to 95% of total system memory + let sys = System::new_with_specifics(RefreshKind::everything()); + (sys.total_memory() as f64 * 0.95) as u64 + }; + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), max_memory_target)); let head_object_result = block_on(client.head_object(bucket, key)).expect("HeadObject failed"); let size = head_object_result.object.size; diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index b2451d53a..2df7f2c0c 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -32,6 +32,7 @@ use crate::fs::{CacheConfig, S3FilesystemConfig, ServerSideEncryption, TimeToLiv use crate::fuse::session::FuseSession; use crate::fuse::S3FuseFilesystem; use crate::logging::{init_logging, LoggingConfig}; +use crate::mem_limiter::MINIMUM_MEM_LIMIT; use crate::prefetch::{caching_prefetch, default_prefetch, Prefetch}; use crate::prefix::Prefix; use crate::s3::S3Personality; @@ -787,7 +788,6 @@ where filesystem_config.s3_personality = s3_personality; filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone()); - const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024; let sys = System::new_with_specifics(RefreshKind::everything()); let default_mem_target = (sys.total_memory() as f64 * 0.95) as u64; filesystem_config.mem_limit = default_mem_target.max(MINIMUM_MEM_LIMIT); diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 4e2af1a78..5172e11ca 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -21,7 +21,7 @@ use crate::inode::{ Inode, InodeError, InodeKind, LookedUp, ReadHandle, ReaddirHandle, Superblock, SuperblockConfig, WriteHandle, }; use crate::logging; -use crate::mem_limiter::MemoryLimiter; +use crate::mem_limiter::{MemoryLimiter, MINIMUM_MEM_LIMIT}; use crate::object::ObjectId; use crate::prefetch::{Prefetch, PrefetchResult}; use crate::prefix::Prefix; @@ -422,7 +422,7 @@ impl Default for S3FilesystemConfig { s3_personality: S3Personality::default(), server_side_encryption: Default::default(), use_upload_checksums: true, - mem_limit: 512 * 1024 * 1024, + mem_limit: MINIMUM_MEM_LIMIT, } } } diff --git a/mountpoint-s3/src/mem_limiter.rs b/mountpoint-s3/src/mem_limiter.rs index 85ea93c5e..780071184 100644 --- a/mountpoint-s3/src/mem_limiter.rs +++ b/mountpoint-s3/src/mem_limiter.rs @@ -5,6 +5,8 @@ use metrics::atomics::AtomicU64; use mountpoint_s3_client::ObjectClient; use tracing::debug; +pub const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024; + /// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted. /// Currently, there are two metrics we take into account: /// 1) the memory reserved by prefetcher instances for the data requested or fetched from CRT client. diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index a018b92df..d6aac7fa7 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -267,7 +267,7 @@ pub struct PrefetchGetObject { part_stream: Arc, mem_limiter: Arc>, config: PrefetcherConfig, - backpressure_task: Option>, + backpressure_task: Option>, // Invariant: the offset of the last byte in this window is always // self.next_sequential_read_offset - 1. backward_seek_window: SeekWindow, @@ -420,7 +420,7 @@ where /// We will be using flow-control window to control how much data we want to download into the prefetcher. fn spawn_read_backpressure_request( &mut self, - ) -> Result, PrefetchReadError> { + ) -> Result, PrefetchReadError> { let start = self.next_sequential_read_offset; let object_size = self.size as usize; let read_part_size = self.client.read_part_size().unwrap_or(8 * 1024 * 1024); @@ -560,6 +560,7 @@ mod tests { #![allow(clippy::identity_op)] use crate::data_cache::InMemoryDataCache; + use crate::mem_limiter::MINIMUM_MEM_LIMIT; use super::caching_stream::CachingPartStream; use super::*; @@ -619,7 +620,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests()); let etag = object.etag(); @@ -714,7 +715,7 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let client = Arc::new(MockClient::new(client_config)); - let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let read_size = 1 * MB; let object_size = 8 * MB; let object = MockObject::ramp(0xaa, object_size, ETag::for_tests()); @@ -821,7 +822,7 @@ mod tests { HashMap::new(), HashMap::new(), )); - let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let prefetcher_config = PrefetcherConfig { max_read_window_size: test_config.max_read_window_size, @@ -946,7 +947,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let etag = object.etag(); @@ -1130,7 +1131,7 @@ mod tests { HashMap::new(), HashMap::new(), )); - let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let prefetcher = Prefetcher::new(default_stream(), Default::default()); let mem_limiter = Arc::new(mem_limiter); @@ -1183,7 +1184,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024)); + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT)); let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); let etag = object.etag(); @@ -1225,7 +1226,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024)); + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT)); let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); let etag = object.etag(); @@ -1287,7 +1288,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let file_etag = object.etag(); @@ -1353,7 +1354,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let file_etag = object.etag(); diff --git a/mountpoint-s3/src/prefetch/backpressure_controller.rs b/mountpoint-s3/src/prefetch/backpressure_controller.rs index 28dc8c893..86db54031 100644 --- a/mountpoint-s3/src/prefetch/backpressure_controller.rs +++ b/mountpoint-s3/src/prefetch/backpressure_controller.rs @@ -179,7 +179,9 @@ impl BackpressureController { // Scaling up fails silently if there is no enough free memory to perform it. fn scale_up(&mut self) { if self.preferred_read_window_size < self.max_read_window_size { - let new_read_window_size = self.preferred_read_window_size * self.read_window_size_multiplier; + let new_read_window_size = (self.preferred_read_window_size * self.read_window_size_multiplier) + .max(self.min_read_window_size) + .min(self.max_read_window_size); // Only scale up when there is enough memory. We don't have to reserve the memory here // because only `preferred_read_window_size` is increased but the actual read window will // be updated later on `DataRead` event (where we do reserve memory). @@ -203,7 +205,9 @@ impl BackpressureController { fn scale_down(&mut self) { if self.preferred_read_window_size > self.min_read_window_size { assert!(self.read_window_size_multiplier > 1); - let new_read_window_size = self.preferred_read_window_size / self.read_window_size_multiplier; + let new_read_window_size = (self.preferred_read_window_size / self.read_window_size_multiplier) + .max(self.min_read_window_size) + .min(self.max_read_window_size); let formatter = make_format(humansize::BINARY); debug!( current_size = formatter(self.preferred_read_window_size), @@ -267,3 +271,88 @@ impl BackpressureLimiter { Ok(Some(self.read_window_end_offset)) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig}; + use test_case::test_case; + + use crate::mem_limiter::MemoryLimiter; + + use super::{new_backpressure_controller, BackpressureConfig, BackpressureController, BackpressureLimiter}; + + #[test_case(1024 * 1024 + 128 * 1024, 2)] // real config + #[test_case(3 * 1024 * 1024, 4)] + #[test_case(8 * 1024 * 1024, 8)] + #[test_case(2 * 1024 * 1024 * 1024, 2)] + fn test_read_window_scale_up(initial_read_window_size: usize, read_window_size_multiplier: usize) { + let request_range = 0..(5 * 1024 * 1024 * 1024); + let backpressure_config = BackpressureConfig { + initial_read_window_size, + min_read_window_size: 8 * 1024 * 1024, + max_read_window_size: 2 * 1024 * 1024 * 1024, + read_window_size_multiplier, + request_range, + }; + + let (mut backpressure_controller, _backpressure_limiter) = + new_backpressure_controller_for_test(backpressure_config); + while backpressure_controller.preferred_read_window_size < backpressure_controller.max_read_window_size { + backpressure_controller.scale_up(); + assert!(backpressure_controller.preferred_read_window_size >= backpressure_controller.min_read_window_size); + assert!(backpressure_controller.preferred_read_window_size <= backpressure_controller.max_read_window_size); + } + assert_eq!( + backpressure_controller.preferred_read_window_size, backpressure_controller.max_read_window_size, + "should have scaled up to max read window size" + ); + } + + #[test_case(2 * 1024 * 1024 * 1024, 2)] + #[test_case(15 * 1024 * 1024 * 1024, 2)] + #[test_case(2 * 1024 * 1024 * 1024, 8)] + #[test_case(8 * 1024 * 1024, 8)] + fn test_read_window_scale_down(initial_read_window_size: usize, read_window_size_multiplier: usize) { + let request_range = 0..(5 * 1024 * 1024 * 1024); + let backpressure_config = BackpressureConfig { + initial_read_window_size, + min_read_window_size: 8 * 1024 * 1024, + max_read_window_size: 2 * 1024 * 1024 * 1024, + read_window_size_multiplier, + request_range, + }; + + let (mut backpressure_controller, _backpressure_limiter) = + new_backpressure_controller_for_test(backpressure_config); + while backpressure_controller.preferred_read_window_size > backpressure_controller.min_read_window_size { + backpressure_controller.scale_down(); + assert!(backpressure_controller.preferred_read_window_size <= backpressure_controller.max_read_window_size); + assert!(backpressure_controller.preferred_read_window_size >= backpressure_controller.min_read_window_size); + } + assert_eq!( + backpressure_controller.preferred_read_window_size, backpressure_controller.min_read_window_size, + "should have scaled down to min read window size" + ); + } + + fn new_backpressure_controller_for_test( + backpressure_config: BackpressureConfig, + ) -> (BackpressureController, BackpressureLimiter) { + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: 8 * 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: backpressure_config.initial_read_window_size, + ..Default::default() + }; + + let client = MockClient::new(config); + let mem_limiter = Arc::new(MemoryLimiter::new( + client, + backpressure_config.max_read_window_size as u64, + )); + new_backpressure_controller(backpressure_config, mem_limiter.clone()) + } +} diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index e664d7b03..7d7d044c8 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -47,7 +47,7 @@ where client: &Client, config: RequestTaskConfig, mem_limiter: Arc>, - ) -> RequestTask<::ClientError, Client> + ) -> RequestTask where Client: ObjectClient + Clone + Send + Sync + 'static, { @@ -391,7 +391,11 @@ mod tests { }; use test_case::test_case; - use crate::{data_cache::InMemoryDataCache, mem_limiter::MemoryLimiter, object::ObjectId}; + use crate::{ + data_cache::InMemoryDataCache, + mem_limiter::{MemoryLimiter, MINIMUM_MEM_LIMIT}, + object::ObjectId, + }; use super::*; @@ -432,7 +436,7 @@ mod tests { ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); - let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), 512 * 1024 * 1024)); + let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), MINIMUM_MEM_LIMIT)); mock_client.add_object(key, object.clone()); let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -513,7 +517,7 @@ mod tests { ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); - let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), 512 * 1024 * 1024)); + let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), MINIMUM_MEM_LIMIT)); mock_client.add_object(key, object.clone()); let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -537,11 +541,7 @@ mod tests { } } - fn compare_read( - id: &ObjectId, - object: &MockObject, - mut request_task: RequestTask, - ) { + fn compare_read(id: &ObjectId, object: &MockObject, mut request_task: RequestTask) { let mut offset = request_task.start_offset(); let mut remaining = request_task.total_size(); while remaining > 0 { diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index aa1a1d49f..3f8b2e16d 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -13,12 +13,12 @@ use crate::sync::Arc; /// A queue of [Part]s where the first part can be partially read from if the reader doesn't want /// the entire part in one shot. #[derive(Debug)] -pub struct PartQueue { +pub struct PartQueue { /// The auxiliary queue that supports pushing parts to the front of the part queue in order to /// allow partial reads and backwards seeks. front_queue: Vec, /// The main queue that receives parts from [super::ObjectPartStream] - receiver: Receiver>>, + receiver: Receiver>>, failed: bool, /// The total number of bytes sent to the underlying queue of `self.receiver` bytes_received: Arc, @@ -34,9 +34,9 @@ pub struct PartQueueProducer { } /// Creates an unbounded [PartQueue] and its related [PartQueueProducer]. -pub fn unbounded_part_queue( +pub fn unbounded_part_queue( mem_limiter: Arc>, -) -> (PartQueue, PartQueueProducer) { +) -> (PartQueue, PartQueueProducer) { let (sender, receiver) = unbounded(); let bytes_counter = Arc::new(AtomicUsize::new(0)); let part_queue = PartQueue { @@ -53,14 +53,14 @@ pub fn unbounded_part_queue( (part_queue, part_queue_producer) } -impl PartQueue { +impl PartQueue { /// Read up to `length` bytes from the queue at the current offset. This function always returns /// a contiguous [Bytes], and so may return fewer than `length` bytes if it would need to copy /// or reallocate to make the return value contiguous. This function blocks only if the queue is /// empty. /// /// If this method returns an Err, the PartQueue must never be accessed again. - pub async fn read(&mut self, length: usize) -> Result> { + pub async fn read(&mut self, length: usize) -> Result> { assert!(!self.failed, "cannot use a PartQueue after failure"); // Read from the auxiliary queue first if it's not empty @@ -98,7 +98,7 @@ impl PartQueue Result<(), PrefetchReadError> { + pub async fn push_front(&mut self, part: Part) -> Result<(), PrefetchReadError> { assert!(!self.failed, "cannot use a PartQueue after failure"); metrics::gauge!("prefetch.bytes_in_queue").increment(part.len() as f64); @@ -130,7 +130,7 @@ impl PartQueueProducer { } } -impl Drop for PartQueue { +impl Drop for PartQueue { fn drop(&mut self) { // close the channel and drain remaining parts from the main queue self.receiver.close(); @@ -151,6 +151,7 @@ impl Drop for PartQueue { #[cfg(test)] mod tests { use crate::checksums::ChecksummedBytes; + use crate::mem_limiter::MINIMUM_MEM_LIMIT; use crate::object::ObjectId; use super::*; @@ -161,7 +162,6 @@ mod tests { use mountpoint_s3_client::types::ETag; use proptest::proptest; use proptest_derive::Arbitrary; - use thiserror::Error; #[derive(Debug, Arbitrary)] enum Op { @@ -170,14 +170,11 @@ mod tests { Push(#[proptest(strategy = "1usize..8192")] usize), } - #[derive(Debug, Error)] - enum DummyError {} - async fn run_test(ops: Vec) { let client = MockClient::new(Default::default()); - let mem_limiter = MemoryLimiter::new(client, 512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client, MINIMUM_MEM_LIMIT); let part_id = ObjectId::new("key".to_owned(), ETag::for_tests()); - let (mut part_queue, part_queue_producer) = unbounded_part_queue::(mem_limiter.into()); + let (mut part_queue, part_queue_producer) = unbounded_part_queue::(mem_limiter.into()); let mut current_offset = 0; let mut current_length = 0; for op in ops { diff --git a/mountpoint-s3/src/prefetch/part_stream.rs b/mountpoint-s3/src/prefetch/part_stream.rs index 7fad902f6..b5eb32e01 100644 --- a/mountpoint-s3/src/prefetch/part_stream.rs +++ b/mountpoint-s3/src/prefetch/part_stream.rs @@ -29,7 +29,7 @@ pub trait ObjectPartStream { client: &Client, config: RequestTaskConfig, mem_limiter: Arc>, - ) -> RequestTask + ) -> RequestTask where Client: ObjectClient + Clone + Send + Sync + 'static; } @@ -186,7 +186,7 @@ where client: &Client, config: RequestTaskConfig, mem_limiter: Arc>, - ) -> RequestTask + ) -> RequestTask where Client: ObjectClient + Clone + Send + Sync + 'static, { diff --git a/mountpoint-s3/src/prefetch/task.rs b/mountpoint-s3/src/prefetch/task.rs index bf47190fe..9ff2718ee 100644 --- a/mountpoint-s3/src/prefetch/task.rs +++ b/mountpoint-s3/src/prefetch/task.rs @@ -11,21 +11,21 @@ use super::part_stream::RequestRange; /// A single GetObject request submitted to the S3 client #[derive(Debug)] -pub struct RequestTask { +pub struct RequestTask { /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if /// the request is fake (created by seeking backwards in the stream) _task_handle: RemoteHandle<()>, remaining: usize, range: RequestRange, - part_queue: PartQueue, + part_queue: PartQueue, backpressure_controller: BackpressureController, } -impl RequestTask { +impl RequestTask { pub fn from_handle( task_handle: RemoteHandle<()>, range: RequestRange, - part_queue: PartQueue, + part_queue: PartQueue, backpressure_controller: BackpressureController, ) -> Self { Self { @@ -38,7 +38,7 @@ impl RequestTask) -> Result<(), PrefetchReadError> { + pub async fn push_front(&mut self, parts: Vec) -> Result<(), PrefetchReadError> { // Iterate backwards to push each part to the front of the part queue for part in parts.into_iter().rev() { self.remaining += part.len(); @@ -47,7 +47,7 @@ impl RequestTask Result> { + pub async fn read(&mut self, length: usize) -> Result> { let part = self.part_queue.read(length).await?; debug_assert!(part.len() <= self.remaining); self.remaining -= part.len();