diff --git a/Cargo.lock b/Cargo.lock index dcb7ab7bb..70da732e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2051,6 +2051,15 @@ dependencies = [ "url", ] +[[package]] +name = "humansize" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" +dependencies = [ + "libm", +] + [[package]] name = "humantime" version = "2.1.0" @@ -2476,6 +2485,7 @@ dependencies = [ "hdrhistogram", "hex", "httpmock", + "humansize", "lazy_static", "libc", "linked-hash-map", diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index cc6f566c1..40e31ae11 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -44,6 +44,7 @@ tracing = { version = "0.1.35", features = ["log"] } tracing-log = "0.2.0" tracing-subscriber = { version = "0.3.14", features = ["env-filter"] } async-stream = "0.3.5" +humansize = "2.1.3" [target.'cfg(target_os = "linux")'.dependencies] procfs = { version = "0.16.0", default-features = false } diff --git a/mountpoint-s3/examples/prefetch_benchmark.rs b/mountpoint-s3/examples/prefetch_benchmark.rs index 737eb3d2e..77397d194 100644 --- a/mountpoint-s3/examples/prefetch_benchmark.rs +++ b/mountpoint-s3/examples/prefetch_benchmark.rs @@ -4,6 +4,8 @@ use std::time::Instant; use clap::{Arg, Command}; use futures::executor::{block_on, ThreadPool}; +use mountpoint_s3::mem_limiter::MemoryLimiter; +use mountpoint_s3::object::ObjectId; use mountpoint_s3::prefetch::{default_prefetch, Prefetch, PrefetchResult}; use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; use mountpoint_s3_client::types::ETag; @@ -77,6 +79,7 @@ fn main() { config = config.part_size(part_size); } let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client")); + let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024)); for i in 0..iterations.unwrap_or(1) { let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -85,7 +88,8 @@ fn main() { let start = Instant::now(); - let mut request = manager.prefetch(client.clone(), bucket, key, size, ETag::for_tests()); + let object_id = ObjectId::new(key.clone(), ETag::for_tests()); + let mut request = manager.prefetch(client.clone(), mem_limiter.clone(), bucket.clone(), object_id, size); block_on(async { loop { let offset = received_size.load(Ordering::SeqCst); diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index beca27628..3668922cf 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -25,6 +25,7 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup; use nix::sys::signal::Signal; use nix::unistd::ForkResult; use regex::Regex; +use sysinfo::{RefreshKind, System}; use crate::build_info; use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ManagedCacheDir}; @@ -157,6 +158,15 @@ pub struct CliArgs { )] pub max_threads: u64, + #[clap( + long, + help = "Maximum memory usage target [default: 95% of total system memory with a minimum of 512 MiB]", + value_name = "MiB", + value_parser = value_parser!(u64).range(512..), + help_heading = CLIENT_OPTIONS_HEADER + )] + pub max_memory_target: Option, + #[clap( long, help = "Part size for multi-part GET and PUT in bytes", @@ -393,12 +403,14 @@ impl CliArgs { let mut filter = if self.debug { String::from("debug") } else { - String::from("warn") + String::from("info") }; let crt_verbosity = if self.debug_crt { "debug" } else { "off" }; filter.push_str(&format!(",{}={}", AWSCRT_LOG_TARGET, crt_verbosity)); if self.log_metrics { filter.push_str(&format!(",{}=info", metrics::TARGET_NAME)); + } else { + filter.push_str(&format!(",{}=off", metrics::TARGET_NAME)); } filter }; @@ -738,6 +750,14 @@ where filesystem_config.allow_overwrite = args.allow_overwrite; filesystem_config.s3_personality = s3_personality; filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse, args.sse_kms_key_id); + filesystem_config.mem_limit = if let Some(max_mem_target) = args.max_memory_target { + max_mem_target * 1024 * 1024 + } else { + const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024; + let sys = System::new_with_specifics(RefreshKind::everything()); + let default_mem_target = (sys.total_memory() as f64 * 0.95) as u64; + default_mem_target.max(MINIMUM_MEM_LIMIT) + }; // Written in this awkward way to force us to update it if we add new checksum types filesystem_config.use_upload_checksums = match args.upload_checksums { diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index b03563cd2..61e316e71 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -21,6 +21,8 @@ use crate::inode::{ Inode, InodeError, InodeKind, LookedUp, ReadHandle, ReaddirHandle, Superblock, SuperblockConfig, WriteHandle, }; use crate::logging; +use crate::mem_limiter::MemoryLimiter; +use crate::object::ObjectId; use crate::prefetch::{Prefetch, PrefetchResult}; use crate::prefix::Prefix; use crate::s3::S3Personality; @@ -151,9 +153,14 @@ where None => return Err(err!(libc::EBADF, "no E-Tag for inode {}", lookup.inode.ino())), Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"), }; - let request = fs - .prefetcher - .prefetch(fs.client.clone(), &fs.bucket, &full_key, object_size, etag.clone()); + let object_id = ObjectId::new(full_key, etag); + let request = fs.prefetcher.prefetch( + fs.client.clone(), + fs.mem_limiter.clone(), + fs.bucket.clone(), + object_id, + object_size, + ); let handle = FileHandleState::Read { handle, request }; metrics::gauge!("fs.current_handles", "type" => "read").increment(1.0); Ok(handle) @@ -393,6 +400,8 @@ pub struct S3FilesystemConfig { pub server_side_encryption: ServerSideEncryption, /// Use additional checksums for uploads pub use_upload_checksums: bool, + /// Memory limit + pub mem_limit: u64, } impl Default for S3FilesystemConfig { @@ -413,6 +422,7 @@ impl Default for S3FilesystemConfig { s3_personality: S3Personality::default(), server_side_encryption: Default::default(), use_upload_checksums: true, + mem_limit: 512 * 1024 * 1024, } } } @@ -526,6 +536,7 @@ where { config: S3FilesystemConfig, client: Arc, + mem_limiter: Arc, superblock: Superblock, prefetcher: Prefetcher, uploader: Uploader, @@ -558,6 +569,7 @@ where let superblock = Superblock::new(bucket, prefix, superblock_config); let client = Arc::new(client); + let mem_limiter = Arc::new(MemoryLimiter::new(config.mem_limit)); let uploader = Uploader::new( client.clone(), @@ -569,6 +581,7 @@ where Self { config, client, + mem_limiter, superblock, prefetcher, uploader, diff --git a/mountpoint-s3/src/lib.rs b/mountpoint-s3/src/lib.rs index 084cd3cc9..e0475a0eb 100644 --- a/mountpoint-s3/src/lib.rs +++ b/mountpoint-s3/src/lib.rs @@ -7,8 +7,9 @@ pub mod fs; pub mod fuse; mod inode; pub mod logging; +pub mod mem_limiter; pub mod metrics; -mod object; +pub mod object; pub mod prefetch; pub mod prefix; pub mod s3; diff --git a/mountpoint-s3/src/mem_limiter.rs b/mountpoint-s3/src/mem_limiter.rs new file mode 100644 index 000000000..bd1810fcd --- /dev/null +++ b/mountpoint-s3/src/mem_limiter.rs @@ -0,0 +1,109 @@ +use std::sync::atomic::Ordering; + +use humansize::make_format; +use metrics::atomics::AtomicU64; +use tracing::debug; + +/// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted. +/// Currently the only metric which we take into account is the memory reserved by prefetcher instances for the data requested or +/// fetched from CRT client. Single instance of this struct is shared among all of the prefetchers (file handles). +/// +/// Each file handle upon creation makes an initial reservation request with a minimal read window size of `1MiB + 128KiB`. This +/// is accepted unconditionally since we want to allow any file handle to make progress even if that means going over the memory +/// limit. Additional reservations for a file handle arise when data is being read from fuse **faster** than it arrives from the +/// client (PartQueueStall). Those reservations may be rejected if there is no available memory. +/// +/// Release of the reserved memory happens on one of the following events: +/// 1) prefetcher is destroyed (`PartQueue` holding the data should be dropped and the CRT request cancelled before this release) +/// 2) prefetcher's read window is scaled down (we wait for the previously requested data to be consumed) +/// 3) prefetcher is approaching the end of the request, in which case we can be sure that reservation in full won't be needed. +/// +/// Following is the visualisation of a single prefetcher instance's data stream: +/// +/// backwards_seek_start next_read_offset in_part_queue window_end_offset preferred_window_end_offset +/// │ │ │ │ │ +/// ─┼────────────────────┼───────────────────────────┼───────────────────────────────┼────────────────────────────┼───────────-► +/// │ ├───────────────────────────┤ │ │ +/// └────────────────────┤ certainly used memory └───────────────────────────────┤ │ +/// memory not accounted │ in CRT buffer, or callback queue └────────────────────────────┤ +/// │ (usage may be less than reserved) will be used after the │ +/// │ window increase │ +/// └────────────────────────────────────────────────────────────────────────────────────────┘ +/// preferred_read_window_size (reserved in MemoryLimiter) +/// +#[derive(Debug)] +pub struct MemoryLimiter { + mem_limit: u64, + /// Reserved memory for data we had requested via the request task but may not + /// arrived yet. + prefetcher_mem_reserved: AtomicU64, + /// Additional reserved memory for other non-buffer usage like storing metadata + additional_mem_reserved: u64, +} + +impl MemoryLimiter { + pub fn new(mem_limit: u64) -> Self { + let min_reserved = 128 * 1024 * 1024; + let reserved_mem = (mem_limit / 8).max(min_reserved); + let formatter = make_format(humansize::BINARY); + debug!( + "target memory usage is {} with {} reserved memory", + formatter(mem_limit), + formatter(reserved_mem) + ); + Self { + mem_limit, + prefetcher_mem_reserved: AtomicU64::new(0), + additional_mem_reserved: reserved_mem, + } + } + + /// Reserve the memory for future uses. Always succeeds, even if it means going beyond + /// the configured memory limit. + pub fn reserve(&self, size: u64) { + self.prefetcher_mem_reserved.fetch_add(size, Ordering::SeqCst); + metrics::gauge!("prefetch.bytes_reserved").increment(size as f64); + } + + /// Reserve the memory for future uses. If there is not enough memory returns `false`. + pub fn try_reserve(&self, size: u64, min_available: u64) -> bool { + loop { + let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst); + let new_prefetcher_mem_reserved = prefetcher_mem_reserved.saturating_add(size); + let total_mem_usage = prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved); + let new_total_mem_usage = new_prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved); + if new_total_mem_usage > self.mem_limit - min_available { + debug!( + "not enough memory to reserve, current usage: {}, new (if scaled up): {}, allowed diff: {}", + total_mem_usage, new_total_mem_usage, min_available, + ); + return false; + } + match self.prefetcher_mem_reserved.compare_exchange_weak( + prefetcher_mem_reserved, + new_prefetcher_mem_reserved, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => { + metrics::gauge!("prefetch.bytes_reserved").increment(size as f64); + return true; + } + Err(_) => continue, // another thread updated the atomic before us, trying again + } + } + } + + /// Release the reserved memory. + pub fn release(&self, size: u64) { + self.prefetcher_mem_reserved.fetch_sub(size, Ordering::SeqCst); + metrics::gauge!("prefetch.bytes_reserved").decrement(size as f64); + } + + pub fn available_mem(&self) -> u64 { + let fs_mem_usage = self.prefetcher_mem_reserved.load(Ordering::SeqCst); + self.mem_limit + .saturating_sub(fs_mem_usage) + .saturating_sub(self.additional_mem_reserved) + } +} diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 984c48ea6..4e1866bad 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -46,15 +46,15 @@ use async_trait::async_trait; use futures::task::Spawn; use metrics::{counter, histogram}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; -use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::ObjectClient; use part::PartOperationError; use part_stream::RequestTaskConfig; use thiserror::Error; -use tracing::trace; +use tracing::{debug, trace}; use crate::checksums::{ChecksummedBytes, IntegrityError}; use crate::data_cache::DataCache; +use crate::mem_limiter::MemoryLimiter; use crate::object::ObjectId; use crate::prefetch::caching_stream::CachingPartStream; use crate::prefetch::part_stream::{ClientPartStream, ObjectPartStream, RequestRange}; @@ -70,10 +70,10 @@ pub trait Prefetch { fn prefetch( &self, client: Arc, - bucket: &str, - key: &str, + mem_limiter: Arc, + bucket: String, + object_id: ObjectId, size: u64, - etag: ETag, ) -> Self::PrefetchResult where Client: ObjectClient + Send + Sync + 'static; @@ -164,7 +164,7 @@ impl Default for PrefetcherConfig { fn default() -> Self { Self { max_read_window_size: 2 * 1024 * 1024 * 1024, - sequential_prefetch_multiplier: 8, + sequential_prefetch_multiplier: 2, read_timeout: Duration::from_secs(60), // We want these large enough to tolerate a single out-of-order Linux readahead, which // is at most 256KiB backwards and then 512KiB forwards. For forwards seeks, we're also @@ -203,10 +203,10 @@ where fn prefetch( &self, client: Arc, - bucket: &str, - key: &str, + mem_limiter: Arc, + bucket: String, + object_id: ObjectId, size: u64, - etag: ETag, ) -> Self::PrefetchResult where Client: ObjectClient + Send + Sync + 'static, @@ -214,11 +214,11 @@ where PrefetchGetObject::new( client.clone(), self.part_stream.clone(), + mem_limiter, self.config, bucket, - key, + object_id, size, - etag, ) } } @@ -229,6 +229,7 @@ where pub struct PrefetchGetObject { client: Arc, part_stream: Arc, + mem_limiter: Arc, config: PrefetcherConfig, backpressure_task: Option>, // Invariant: the offset of the last byte in this window is always @@ -282,15 +283,16 @@ where fn new( client: Arc, part_stream: Arc, + mem_limiter: Arc, config: PrefetcherConfig, - bucket: &str, - key: &str, + bucket: String, + object_id: ObjectId, size: u64, - etag: ETag, ) -> Self { PrefetchGetObject { client, part_stream, + mem_limiter, config, backpressure_task: None, backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize), @@ -298,8 +300,8 @@ where sequential_read_start_offset: 0, next_sequential_read_offset: 0, next_request_offset: 0, - bucket: bucket.to_owned(), - object_id: ObjectId::new(key.to_owned(), etag), + bucket, + object_id, size, } } @@ -331,7 +333,7 @@ where if self.try_seek(offset).await? { trace!("seek succeeded"); } else { - trace!( + debug!( expected = self.next_sequential_read_offset, actual = offset, "out-of-order read, resetting prefetch" @@ -385,6 +387,7 @@ where ) -> Result, PrefetchReadError> { let start = self.next_sequential_read_offset; let object_size = self.size as usize; + let read_part_size = self.client.read_part_size().unwrap_or(8 * 1024 * 1024); let range = RequestRange::new(object_size, start, object_size); // The prefetcher now relies on backpressure mechanism so it must be enabled @@ -403,12 +406,15 @@ where bucket: self.bucket.clone(), object_id: self.object_id.clone(), range, + read_part_size, preferred_part_size: self.preferred_part_size, initial_read_window_size, max_read_window_size: self.config.max_read_window_size, read_window_size_multiplier: self.config.sequential_prefetch_multiplier, }; - Ok(self.part_stream.spawn_get_object_request(&self.client, config)) + Ok(self + .part_stream + .spawn_get_object_request(self.client.clone(), config, self.mem_limiter.clone())) } /// Reset this prefetch request to a new offset, clearing any existing tasks queued. @@ -521,6 +527,7 @@ mod tests { use mountpoint_s3_client::error::GetObjectError; use mountpoint_s3_client::failure_client::{countdown_failure_client, RequestFailureMap}; use mountpoint_s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject}; + use mountpoint_s3_client::types::ETag; use proptest::proptest; use proptest::strategy::{Just, Strategy}; use proptest_derive::Arbitrary; @@ -572,6 +579,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests()); let etag = object.etag(); @@ -586,7 +594,8 @@ mod tests { }; let prefetcher = Prefetcher::new(part_stream, prefetcher_config); - let mut request = prefetcher.prefetch(client, "test-bucket", "hello", size, etag); + let object_id = ObjectId::new("hello".to_owned(), etag); + let mut request = prefetcher.prefetch(client, mem_limiter.into(), "test-bucket".to_owned(), object_id, size); let mut next_offset = 0; loop { @@ -664,7 +673,8 @@ mod tests { ) where Stream: ObjectPartStream + Send + Sync + 'static, { - let client = MockClient::new(client_config); + let client = Arc::new(MockClient::new(client_config)); + let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); let read_size = 1 * MB; let object_size = 8 * MB; let object = MockObject::ramp(0xaa, object_size, ETag::for_tests()); @@ -677,7 +687,14 @@ mod tests { }; let prefetcher = Prefetcher::new(part_stream, prefetcher_config); - let mut request = prefetcher.prefetch(Arc::new(client), "test-bucket", "hello", object_size as u64, etag); + let object_id = ObjectId::new("hello".to_owned(), etag); + let mut request = prefetcher.prefetch( + client, + mem_limiter.into(), + "test-bucket".to_owned(), + object_id, + object_size as u64, + ); let result = block_on(request.read(0, read_size)); assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed))); } @@ -757,7 +774,14 @@ mod tests { client.add_object("hello", object); - let client = countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new(), HashMap::new()); + let client = Arc::new(countdown_failure_client( + client, + get_failures, + HashMap::new(), + HashMap::new(), + HashMap::new(), + )); + let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); let prefetcher_config = PrefetcherConfig { max_read_window_size: test_config.max_read_window_size, @@ -766,7 +790,8 @@ mod tests { }; let prefetcher = Prefetcher::new(part_stream, prefetcher_config); - let mut request = prefetcher.prefetch(Arc::new(client), "test-bucket", "hello", size, etag); + let object_id = ObjectId::new("hello".to_owned(), etag); + let mut request = prefetcher.prefetch(client, mem_limiter.into(), "test-bucket".to_owned(), object_id, size); let mut next_offset = 0; loop { @@ -881,6 +906,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let etag = object.etag(); @@ -895,7 +921,14 @@ mod tests { }; let prefetcher = Prefetcher::new(part_stream, prefetcher_config); - let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, etag); + let object_id = ObjectId::new("hello".to_owned(), etag); + let mut request = prefetcher.prefetch( + client, + mem_limiter.into(), + "test-bucket".to_owned(), + object_id, + object_size, + ); for (offset, length) in reads { assert!(offset < object_size); @@ -1057,10 +1090,19 @@ mod tests { HashMap::new(), HashMap::new(), )); + let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); let prefetcher = Prefetcher::new(default_stream(), Default::default()); + let mem_limiter = Arc::new(mem_limiter); block_on(async { - let mut request = prefetcher.prefetch(client, "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + let object_id = ObjectId::new("hello".to_owned(), etag.clone()); + let mut request = prefetcher.prefetch( + client, + mem_limiter, + "test-bucket".to_owned(), + object_id, + OBJECT_SIZE as u64, + ); // The first read should trigger the prefetcher to try and get the whole object (in 2 parts). _ = request.read(0, 1).await.expect("first read should succeed"); @@ -1101,6 +1143,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024)); let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); let etag = object.etag(); @@ -1110,8 +1153,14 @@ mod tests { // Try every possible seek from first_read_size for offset in first_read_size + 1..OBJECT_SIZE { - let mut request = - prefetcher.prefetch(client.clone(), "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + let object_id = ObjectId::new("hello".to_owned(), etag.clone()); + let mut request = prefetcher.prefetch( + client.clone(), + mem_limiter.clone(), + "test-bucket".to_owned(), + object_id, + OBJECT_SIZE as u64, + ); if first_read_size > 0 { let _first_read = block_on(request.read(0, first_read_size)).unwrap(); } @@ -1136,6 +1185,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024)); let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); let etag = object.etag(); @@ -1145,8 +1195,14 @@ mod tests { // Try every possible seek from first_read_size for offset in 0..first_read_size { - let mut request = - prefetcher.prefetch(client.clone(), "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + let object_id = ObjectId::new("hello".to_owned(), etag.clone()); + let mut request = prefetcher.prefetch( + client.clone(), + mem_limiter.clone(), + "test-bucket".to_owned(), + object_id, + OBJECT_SIZE as u64, + ); if first_read_size > 0 { let _first_read = block_on(request.read(0, first_read_size)).unwrap(); } @@ -1191,6 +1247,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let file_etag = object.etag(); @@ -1205,7 +1262,14 @@ mod tests { }; let prefetcher = Prefetcher::new(ClientPartStream::new(ShuttleRuntime), prefetcher_config); - let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, file_etag); + let object_id = ObjectId::new("hello".to_owned(), file_etag); + let mut request = prefetcher.prefetch( + client, + mem_limiter.into(), + "test-bucket".to_owned(), + object_id, + object_size, + ); let mut next_offset = 0; loop { @@ -1249,6 +1313,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let file_etag = object.etag(); @@ -1263,7 +1328,14 @@ mod tests { }; let prefetcher = Prefetcher::new(ClientPartStream::new(ShuttleRuntime), prefetcher_config); - let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, file_etag); + let object_id = ObjectId::new("hello".to_owned(), file_etag); + let mut request = prefetcher.prefetch( + client, + mem_limiter.into(), + "test-bucket".to_owned(), + object_id, + object_size, + ); let num_reads = rng.gen_range(10usize..50); for _ in 0..num_reads { diff --git a/mountpoint-s3/src/prefetch/backpressure_controller.rs b/mountpoint-s3/src/prefetch/backpressure_controller.rs index 269bf459d..7fadacb99 100644 --- a/mountpoint-s3/src/prefetch/backpressure_controller.rs +++ b/mountpoint-s3/src/prefetch/backpressure_controller.rs @@ -1,7 +1,11 @@ use std::ops::Range; +use std::sync::Arc; use async_channel::{unbounded, Receiver, Sender}; -use tracing::trace; +use humansize::make_format; +use tracing::{debug, trace}; + +use crate::mem_limiter::MemoryLimiter; use super::PrefetchReadError; @@ -16,26 +20,35 @@ pub enum BackpressureFeedbackEvent { pub struct BackpressureConfig { /// Backpressure's initial read window size pub initial_read_window_size: usize, + /// Minimum read window size that the backpressure controller is allowed to scale down to + pub min_read_window_size: usize, /// Maximum read window size that the backpressure controller is allowed to scale up to pub max_read_window_size: usize, /// Factor to increase the read window size by when the part queue is stalled pub read_window_size_multiplier: usize, /// Request range to apply backpressure pub request_range: Range, + pub read_part_size: usize, } #[derive(Debug)] pub struct BackpressureController { read_window_updater: Sender, preferred_read_window_size: usize, + min_read_window_size: usize, max_read_window_size: usize, read_window_size_multiplier: usize, /// Upper bound of the current read window. The request can return data up to this /// offset *exclusively*. This value must be advanced to continue fetching new data. read_window_end_offset: u64, + /// Next offset of the data to be read. It is used for tracking how many bytes of + /// data has been read out of the stream. + next_read_offset: u64, /// End offset for the request we want to apply backpressure. The request can return /// data up to this offset *exclusively*. request_end_offset: u64, + read_part_size: usize, + mem_limiter: Arc, } #[derive(Debug)] @@ -61,16 +74,24 @@ pub struct BackpressureLimiter { /// [BackpressureController] will be given to the consumer side of the object stream. /// It can be used anywhere to set preferred read window size for the stream and tell the /// producer when its read window should be increased. -pub fn new_backpressure_controller(config: BackpressureConfig) -> (BackpressureController, BackpressureLimiter) { +pub fn new_backpressure_controller( + config: BackpressureConfig, + mem_limiter: Arc, +) -> (BackpressureController, BackpressureLimiter) { let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64; let (read_window_updater, read_window_incrementing_queue) = unbounded(); + mem_limiter.reserve(config.initial_read_window_size as u64); let controller = BackpressureController { read_window_updater, preferred_read_window_size: config.initial_read_window_size, + min_read_window_size: config.min_read_window_size, max_read_window_size: config.max_read_window_size, read_window_size_multiplier: config.read_window_size_multiplier, read_window_end_offset, + next_read_offset: config.request_range.start, request_end_offset: config.request_range.end, + read_part_size: config.read_part_size, + mem_limiter, }; let limiter = BackpressureLimiter { read_window_incrementing_queue, @@ -91,26 +112,72 @@ impl BackpressureController { match event { // Note, that this may come from a backwards seek, so offsets observed by this method are not necessarily ascending BackpressureFeedbackEvent::DataRead { offset, length } => { + // Step 2. of scale down, including the case when we're approaching the request end. See `self.scale_down` for the logic. let next_read_offset = offset + length as u64; + // We don't update `self.next_read_offset` if this feedback arrived from read after a backwards seek + if next_read_offset > self.next_read_offset { + self.next_read_offset = next_read_offset; + } + if self.next_read_offset >= self.request_end_offset { + self.next_read_offset = self.request_end_offset; + } let remaining_window = self.read_window_end_offset.saturating_sub(next_read_offset) as usize; + + let preffered_window_end_offset = self + .next_read_offset + .saturating_add(self.preferred_read_window_size as u64); + let over_reserved = self.read_window_end_offset.saturating_sub(preffered_window_end_offset); + if over_reserved > 0 { + self.mem_limiter.release((length as u64).min(over_reserved)); + } + if self.request_end_offset < preffered_window_end_offset { + // We won't need the full `preffered_window_end_offset` as we're approaching the request's end. + self.mem_limiter.release(length as u64); + } + // Increment the read window only if the remaining window reaches some threshold i.e. half of it left. if remaining_window < (self.preferred_read_window_size / 2) && self.read_window_end_offset < self.request_end_offset { + // If there is not enough available memory in the system, we'll try to reduce the read window of the current request. + // We define "not enough memory" as a situation where no new request with a minimum window may fit in the limit. + // + // Scaling down is best effort, meaning that there is no guarantee that after this action such a + // request will fit in memory. This may not be the case if during the scale down a new memory reservation was made by + // another request. + // + // We reduce the frequency of scale downs by only performing it when sufficient amount of data (half of read_window) + // was read. + let mut available_mem = self.mem_limiter.available_mem(); + let mut new_read_window_size = self.preferred_read_window_size; // new_preferred_read_window_size is just too wordy + while available_mem < self.min_read_window_size as u64 && self.read_window_size_multiplier > 1 { + let scaled_down = new_read_window_size / self.read_window_size_multiplier; + if scaled_down < self.min_read_window_size { + break; + } + available_mem += (new_read_window_size - scaled_down) as u64; + new_read_window_size = scaled_down; + } + if new_read_window_size != self.preferred_read_window_size { + self.scale_down(new_read_window_size); + } + let new_read_window_end_offset = next_read_offset .saturating_add(self.preferred_read_window_size as u64) .min(self.request_end_offset); - debug_assert!(self.read_window_end_offset < new_read_window_end_offset); - let to_increase = new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize; - trace!( - preferred_read_window_size = self.preferred_read_window_size, - next_read_offset, - read_window_end_offset = self.read_window_end_offset, - to_increase, - "incrementing read window" - ); - self.increment_read_window(to_increase).await; - self.read_window_end_offset = new_read_window_end_offset; + + if self.read_window_end_offset < new_read_window_end_offset { + let to_increase = + new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize; + trace!( + preferred_read_window_size = self.preferred_read_window_size, + next_read_offset = self.next_read_offset, + read_window_end_offset = self.read_window_end_offset, + to_increase, + "incrementing read window" + ); + self.increment_read_window(to_increase).await; + } } } BackpressureFeedbackEvent::PartQueueStall => self.try_scaling_up(), @@ -119,28 +186,111 @@ impl BackpressureController { } // Send an increment read window request to the stream producer - async fn increment_read_window(&self, len: usize) { + async fn increment_read_window(&mut self, len: usize) { // This should not block since the channel is unbounded let _ = self .read_window_updater .send(len) .await .inspect_err(|_| trace!("read window incrementing queue is already closed")); + self.read_window_end_offset += len as u64; } // Try scaling up preferred read window size with a multiplier configured at initialization. + // Scaling up fails silently if there is no enough free memory to perform it. fn try_scaling_up(&mut self) { if self.preferred_read_window_size < self.max_read_window_size { + let new_read_window_size = self.preferred_read_window_size * self.read_window_size_multiplier; + // Also align the new read window size to the client part size let new_read_window_size = - (self.preferred_read_window_size * self.read_window_size_multiplier).min(self.max_read_window_size); - trace!( - current_size = self.preferred_read_window_size, - new_size = new_read_window_size, - "scaling up preferred read window" - ); - self.preferred_read_window_size = new_read_window_size; + align(new_read_window_size, self.read_part_size, false).min(self.max_read_window_size); + + // Only scale up when there is enough memory + let to_increase = (new_read_window_size - self.preferred_read_window_size) as u64; + if self + .mem_limiter + .try_reserve(to_increase, self.min_read_window_size as u64) + { + let formatter = make_format(humansize::BINARY); + debug!( + current_size = formatter(self.preferred_read_window_size), + new_size = formatter(new_read_window_size), + "scaled up preferred read window" + ); + self.preferred_read_window_size = new_read_window_size; + metrics::histogram!("prefetch.window_after_increase_mib") + .record((self.preferred_read_window_size / 1024 / 1024) as f64); + } } } + + pub fn scale_down(&mut self, new_read_window_size: usize) { + /* + Scaling down is performed in 2 steps, one in this method and another on read. Note that `window_end_offset` is the value + which is set in CRT and it may not be decreased. This function implements step 1. + + 0. Before scale down: + + read_until window_end_offset preferred_window_end_offset + │ │ │ + ────┼───────────────────────────────────────────────────────────┼───────────────┼─────────────────────────────────► + │ │ + └───────────────────────────────────────────────────────────────────────────┘ + preferred_read_window_size + + 1. Scaling down (`new_read_window_size` is applied): + + read_until preferred_window_end_offset window_end_offset preferred_window_end_offset_old + │ │ │ │ + ────┼──────────────────────────────────────┼────────────────────┼───────────────┼─────────────────────────────────► + │ ├───────────────┘ + └────────────────────┘ released immediatelly + over_reserved + + 2. Part read: + + read_until(old) read_until preferred_window_end_offset window_end_offset + │ │ │ │ + ────┼────────────┼─────────────────────────────────────┼────────┼─────────────────────────────────────────────────► + └────────────┘ └────────┘ + released on read: over_reserved (new) + 1. if over_reserved > 0 + 2. min(part.size(), over_reserved) is to deduct + */ + // Align the new read window size to the client part size + let new_read_window_size = + align(new_read_window_size, self.read_part_size, false).max(self.min_read_window_size); + + let formatter = make_format(humansize::BINARY); + debug!( + current_size = formatter(self.preferred_read_window_size), + new_size = formatter(new_read_window_size), + "scaling down read window" + ); + let preferred_window_end_offset_old = self + .next_read_offset + .saturating_add(self.preferred_read_window_size as u64); + let preferred_window_end_offset = self.next_read_offset.saturating_add(new_read_window_size as u64); + // In most cases we'll keep memory reserved for `self.read_window_end_offset`, but if the new + // `preferred_window_end_offset` is greater, we'll reserve for it instead. + let reserve_until_offset = self.read_window_end_offset.max(preferred_window_end_offset); + let to_release = preferred_window_end_offset_old.saturating_sub(reserve_until_offset); + self.mem_limiter.release(to_release); + self.preferred_read_window_size = new_read_window_size; + metrics::histogram!("prefetch.window_after_decrease_mib") + .record((self.preferred_read_window_size / 1024 / 1024) as f64); + } +} + +impl Drop for BackpressureController { + fn drop(&mut self) { + // When approaching request end we have less memory still reserved than `self.preferred_read_window_size`. + debug_assert!(self.request_end_offset >= self.next_read_offset); + let remaining_in_request = self.request_end_offset.saturating_sub(self.next_read_offset); + + self.mem_limiter + .release((self.preferred_read_window_size as u64).min(remaining_in_request)); + } } impl BackpressureLimiter { @@ -184,3 +334,18 @@ impl BackpressureLimiter { Ok(Some(self.read_window_end_offset)) } } + +/// Try to align the given read window size to the part boundaries. +/// The `trim_only` flags controls whether the range is only trimmed down to +/// part boundaries or is allowed to grow wider. +fn align(read_window_size: usize, part_size: usize, trim_only: bool) -> usize { + let part_alignment = part_size; + let remainder = read_window_size % part_alignment; + if trim_only || remainder == 0 { + // trim it to the previous part boundary + read_window_size - remainder + } else { + // extend it to the next part boundary + read_window_size + (part_alignment - remainder) + } +} diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index a42121417..01c3f3452 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -9,6 +9,7 @@ use tracing::{debug_span, trace, warn, Instrument}; use crate::checksums::ChecksummedBytes; use crate::data_cache::{BlockIndex, DataCache}; +use crate::mem_limiter::MemoryLimiter; use crate::object::ObjectId; use crate::prefetch::backpressure_controller::{new_backpressure_controller, BackpressureConfig, BackpressureLimiter}; use crate::prefetch::part::Part; @@ -43,26 +44,30 @@ where { fn spawn_get_object_request( &self, - client: &Client, + client: Arc, config: RequestTaskConfig, + mem_limiter: Arc, ) -> RequestTask<::ClientError> where - Client: ObjectClient + Clone + Send + Sync + 'static, + Client: ObjectClient + Send + Sync + 'static, { let range = config.range; let backpressure_config = BackpressureConfig { initial_read_window_size: config.initial_read_window_size, + min_read_window_size: config.read_part_size, max_read_window_size: config.max_read_window_size, read_window_size_multiplier: config.read_window_size_multiplier, request_range: range.into(), + read_part_size: config.read_part_size, }; - let (backpressure_controller, backpressure_limiter) = new_backpressure_controller(backpressure_config); + let (backpressure_controller, backpressure_limiter) = + new_backpressure_controller(backpressure_config, mem_limiter.clone()); let (part_queue, part_queue_producer) = unbounded_part_queue(); trace!(?range, "spawning request"); let request_task = { - let request = CachingRequest::new(client.clone(), self.cache.clone(), backpressure_limiter, config); + let request = CachingRequest::new(client, self.cache.clone(), backpressure_limiter, config); let span = debug_span!("prefetch", ?range); request.get_from_cache(range, part_queue_producer).instrument(span) }; @@ -75,7 +80,7 @@ where #[derive(Debug)] struct CachingRequest { - client: Client, + client: Arc, cache: Arc, backpressure_limiter: BackpressureLimiter, config: RequestTaskConfig, @@ -83,11 +88,11 @@ struct CachingRequest { impl CachingRequest where - Client: ObjectClient + Clone + Send + Sync + 'static, + Client: ObjectClient + Send + Sync + 'static, Cache: DataCache + Send + Sync, { fn new( - client: Client, + client: Arc, cache: Arc, backpressure_limiter: BackpressureLimiter, config: RequestTaskConfig, @@ -388,7 +393,7 @@ mod tests { }; use test_case::test_case; - use crate::{data_cache::InMemoryDataCache, object::ObjectId}; + use crate::{data_cache::InMemoryDataCache, mem_limiter::MemoryLimiter, object::ObjectId}; use super::*; @@ -429,6 +434,7 @@ mod tests { ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); + let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024)); mock_client.add_object(key, object.clone()); let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -444,12 +450,13 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range, + read_part_size: client_part_size, preferred_part_size: 256 * KB, initial_read_window_size, max_read_window_size, read_window_size_multiplier, }; - let request_task = stream.spawn_get_object_request(&mock_client, config); + let request_task = stream.spawn_get_object_request(mock_client.clone(), config, mem_limiter.clone()); compare_read(&id, &object, request_task); get_object_counter.count() }; @@ -469,12 +476,13 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range, + read_part_size: client_part_size, preferred_part_size: 256 * KB, initial_read_window_size, max_read_window_size, read_window_size_multiplier, }; - let request_task = stream.spawn_get_object_request(&mock_client, config); + let request_task = stream.spawn_get_object_request(mock_client.clone(), config, mem_limiter.clone()); compare_read(&id, &object, request_task); get_object_counter.count() }; @@ -507,6 +515,7 @@ mod tests { ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); + let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024)); mock_client.add_object(key, object.clone()); let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -518,12 +527,13 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range: RequestRange::new(object_size, offset as u64, preferred_size), + read_part_size: client_part_size, preferred_part_size: 256 * KB, initial_read_window_size, max_read_window_size, read_window_size_multiplier, }; - let request_task = stream.spawn_get_object_request(&mock_client, config); + let request_task = stream.spawn_get_object_request(mock_client.clone(), config, mem_limiter.clone()); compare_read(&id, &object, request_task); } } diff --git a/mountpoint-s3/src/prefetch/part_stream.rs b/mountpoint-s3/src/prefetch/part_stream.rs index a01b03668..9dd9a0944 100644 --- a/mountpoint-s3/src/prefetch/part_stream.rs +++ b/mountpoint-s3/src/prefetch/part_stream.rs @@ -4,10 +4,12 @@ use futures::task::{Spawn, SpawnExt}; use futures::{pin_mut, Stream, StreamExt}; use mountpoint_s3_client::{types::GetObjectRequest, ObjectClient}; use std::marker::{Send, Sync}; +use std::sync::Arc; use std::{fmt::Debug, ops::Range}; use tracing::{debug_span, error, trace, Instrument}; use crate::checksums::ChecksummedBytes; +use crate::mem_limiter::MemoryLimiter; use crate::object::ObjectId; use crate::prefetch::backpressure_controller::{new_backpressure_controller, BackpressureConfig}; use crate::prefetch::part::Part; @@ -24,11 +26,12 @@ pub trait ObjectPartStream { /// size for the parts, but implementations are allowed to ignore it. fn spawn_get_object_request( &self, - client: &Client, + client: Arc, config: RequestTaskConfig, + mem_limiter: Arc, ) -> RequestTask where - Client: ObjectClient + Clone + Send + Sync + 'static; + Client: ObjectClient + Send + Sync + 'static; } #[derive(Clone, Debug)] @@ -37,6 +40,7 @@ pub struct RequestTaskConfig { pub bucket: String, pub object_id: ObjectId, pub range: RequestRange, + pub read_part_size: usize, pub preferred_part_size: usize, pub initial_read_window_size: usize, pub max_read_window_size: usize, @@ -179,11 +183,12 @@ where { fn spawn_get_object_request( &self, - client: &Client, + client: Arc, config: RequestTaskConfig, + mem_limiter: Arc, ) -> RequestTask where - Client: ObjectClient + Clone + Send + Sync + 'static, + Client: ObjectClient + Send + Sync + 'static, { assert!(config.preferred_part_size > 0); @@ -191,16 +196,20 @@ where let backpressure_config = BackpressureConfig { initial_read_window_size: config.initial_read_window_size, + // We don't want to completely block the stream so let's use + // the read part size as minimum read window. + min_read_window_size: config.read_part_size, max_read_window_size: config.max_read_window_size, read_window_size_multiplier: config.read_window_size_multiplier, request_range: range.into(), + read_part_size: config.read_part_size, }; - let (backpressure_controller, mut backpressure_limiter) = new_backpressure_controller(backpressure_config); + let (backpressure_controller, mut backpressure_limiter) = + new_backpressure_controller(backpressure_config, mem_limiter.clone()); let (part_queue, part_queue_producer) = unbounded_part_queue(); trace!(?range, "spawning request"); let span = debug_span!("prefetch", ?range); - let client = client.clone(); let task_handle = self .runtime .spawn_with_handle( @@ -236,7 +245,10 @@ struct ClientPartComposer { preferred_part_size: usize, } -impl ClientPartComposer { +impl ClientPartComposer +where + E: std::error::Error + Send + Sync, +{ async fn try_compose_parts(&self, request_stream: impl Stream>) { if let Err(e) = self.compose_parts(request_stream).await { trace!(error=?e, "part stream task failed");