Skip to content

Commit

Permalink
Remove Client from MemoryLimiter, document this structure
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Volodkin <vladvolodkin@gmail.com>
  • Loading branch information
vladem committed Sep 6, 2024
1 parent 8231d2d commit ac9bd79
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 102 deletions.
2 changes: 1 addition & 1 deletion mountpoint-s3/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn main() {
config = config.part_size(part_size);
}
let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client"));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024));

for i in 0..iterations.unwrap_or(1) {
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ where
{
config: S3FilesystemConfig,
client: Arc<Client>,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
superblock: Superblock,
prefetcher: Prefetcher,
uploader: Uploader<Client>,
Expand Down Expand Up @@ -569,7 +569,7 @@ where
let superblock = Superblock::new(bucket, prefix, superblock_config);

let client = Arc::new(client);
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit));
let mem_limiter = Arc::new(MemoryLimiter::new(config.mem_limit));

let uploader = Uploader::new(
client.clone(),
Expand Down
59 changes: 31 additions & 28 deletions mountpoint-s3/src/mem_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
use std::sync::{atomic::Ordering, Arc};
use std::sync::atomic::Ordering;

use humansize::make_format;
use metrics::atomics::AtomicU64;
use tracing::debug;

use mountpoint_s3_client::ObjectClient;

/// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted.
/// Currently the only metric which we take into account is the memory reserved by prefetcher instances for the data requested or
/// fetched from CRT client. Single instance of this struct is shared among all of the prefetchers (file handles).
///
/// Each file handle upon creation makes an initial reservation request with a minimal read window size of `1MiB + 128KiB`. This
/// is accepted unconditionally since we want to allow any file handle to make progress even if that means going over the memory
/// limit. Additional reservations for a file handle arise when data is being read from fuse **faster** than it arrives from the
/// client (PartQueueStall). Those reservations may be rejected if there is no available memory.
///
/// Release of the reserved memory happens on one of the following events:
/// 1) prefetcher is destroyed (`PartQueue` holding the data should be dropped and the CRT request cancelled before this release)
/// 2) prefetcher's read window is scaled down (we wait for the previously requested data to be consumed)
/// 3) prefetcher is approaching the end of the request, in which case we can be sure that reservation in full won't be needed.
///
/// Following is the visualisation of a single prefetcher instance's data stream:
///
/// backwards_seek_start next_read_offset in_part_queue window_end_offset preferred_window_end_offset
/// │ │ │ │ │
/// ─┼────────────────────┼───────────────────────────┼───────────────────────────────┼────────────────────────────┼───────────-►
/// │ ├───────────────────────────┤ │ │
/// └────────────────────┤ certainly used memory └───────────────────────────────┤ │
/// memory not accounted │ in CRT buffer, or callback queue └────────────────────────────┤
/// │ (usage may be less than reserved) will be used after the │
/// │ window increase │
/// └────────────────────────────────────────────────────────────────────────────────────────┘
/// preferred_read_window_size (reserved in MemoryLimiter)
///
#[derive(Debug)]
pub struct MemoryLimiter<Client: ObjectClient> {
client: Arc<Client>,
pub struct MemoryLimiter {
mem_limit: u64,
/// Reserved memory for data we had requested via the request task but may not
/// arrived yet.
Expand All @@ -17,8 +41,8 @@ pub struct MemoryLimiter<Client: ObjectClient> {
additional_mem_reserved: u64,
}

impl<Client: ObjectClient> MemoryLimiter<Client> {
pub fn new(client: Arc<Client>, mem_limit: u64) -> Self {
impl MemoryLimiter {
pub fn new(mem_limit: u64) -> Self {
let min_reserved = 128 * 1024 * 1024;
let reserved_mem = (mem_limit / 8).max(min_reserved);
let formatter = make_format(humansize::BINARY);
Expand All @@ -28,7 +52,6 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
formatter(reserved_mem)
);
Self {
client,
mem_limit,
prefetcher_mem_reserved: AtomicU64::new(0),
additional_mem_reserved: reserved_mem,
Expand Down Expand Up @@ -83,24 +106,4 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
.saturating_sub(fs_mem_usage)
.saturating_sub(self.additional_mem_reserved)
}

pub fn log_total_usage(&self) {
let formatter = make_format(humansize::BINARY);
let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);

let mut total_usage = prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved);
if let Some(client_stats) = self.client.mem_usage_stats() {
let effective_client_mem_usage = client_stats.mem_used.max(client_stats.mem_reserved);
total_usage = total_usage.saturating_add(effective_client_mem_usage);

debug!(
total_usage = formatter(total_usage),
client_mem_used = formatter(client_stats.mem_used),
client_mem_reserved = formatter(client_stats.mem_reserved),
prefetcher_mem_reserved = formatter(prefetcher_mem_reserved),
additional_mem_reserved = formatter(self.additional_mem_reserved),
"total memory usage"
);
}
}
}
31 changes: 15 additions & 16 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub trait Prefetch {
fn prefetch<Client>(
&self,
client: Arc<Client>,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
bucket: String,
object_id: ObjectId,
size: u64,
Expand Down Expand Up @@ -203,7 +203,7 @@ where
fn prefetch<Client>(
&self,
client: Arc<Client>,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
bucket: String,
object_id: ObjectId,
size: u64,
Expand All @@ -229,9 +229,9 @@ where
pub struct PrefetchGetObject<Stream: ObjectPartStream, Client: ObjectClient> {
client: Arc<Client>,
part_stream: Arc<Stream>,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
config: PrefetcherConfig,
backpressure_task: Option<RequestTask<Client::ClientError, Client>>,
backpressure_task: Option<RequestTask<Client::ClientError>>,
// Invariant: the offset of the last byte in this window is always
// self.next_sequential_read_offset - 1.
backward_seek_window: SeekWindow,
Expand Down Expand Up @@ -283,7 +283,7 @@ where
fn new(
client: Arc<Client>,
part_stream: Arc<Stream>,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
config: PrefetcherConfig,
bucket: String,
object_id: ObjectId,
Expand Down Expand Up @@ -384,7 +384,7 @@ where
/// We will be using flow-control window to control how much data we want to download into the prefetcher.
fn spawn_read_backpressure_request(
&mut self,
) -> Result<RequestTask<Client::ClientError, Client>, PrefetchReadError<Client::ClientError>> {
) -> Result<RequestTask<Client::ClientError>, PrefetchReadError<Client::ClientError>> {
let start = self.next_sequential_read_offset;
let object_size = self.size as usize;
let read_part_size = self.client.read_part_size().unwrap_or(8 * 1024 * 1024);
Expand Down Expand Up @@ -511,7 +511,6 @@ impl<Stream: ObjectPartStream, Client: ObjectClient> PrefetchGetObject<Stream, C
impl<Stream: ObjectPartStream, Client: ObjectClient> Drop for PrefetchGetObject<Stream, Client> {
fn drop(&mut self) {
self.record_contiguous_read_metric();
self.mem_limiter.log_total_usage();
}
}

Expand Down Expand Up @@ -580,7 +579,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);
let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -675,7 +674,7 @@ mod tests {
Stream: ObjectPartStream + Send + Sync + 'static,
{
let client = Arc::new(MockClient::new(client_config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);
let read_size = 1 * MB;
let object_size = 8 * MB;
let object = MockObject::ramp(0xaa, object_size, ETag::for_tests());
Expand Down Expand Up @@ -782,7 +781,7 @@ mod tests {
HashMap::new(),
HashMap::new(),
));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);

let prefetcher_config = PrefetcherConfig {
max_read_window_size: test_config.max_read_window_size,
Expand Down Expand Up @@ -907,7 +906,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -1091,7 +1090,7 @@ mod tests {
HashMap::new(),
HashMap::new(),
));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);

let prefetcher = Prefetcher::new(default_stream(), Default::default());
let mem_limiter = Arc::new(mem_limiter);
Expand Down Expand Up @@ -1144,7 +1143,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024));
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -1186,7 +1185,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024));
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -1248,7 +1247,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let file_etag = object.etag();

Expand Down Expand Up @@ -1314,7 +1313,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let file_etag = object.etag();

Expand Down
15 changes: 7 additions & 8 deletions mountpoint-s3/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::Arc;

use async_channel::{unbounded, Receiver, Sender};
use humansize::make_format;
use mountpoint_s3_client::ObjectClient;
use tracing::{debug, trace};

use crate::mem_limiter::MemoryLimiter;
Expand Down Expand Up @@ -33,7 +32,7 @@ pub struct BackpressureConfig {
}

#[derive(Debug)]
pub struct BackpressureController<Client: ObjectClient> {
pub struct BackpressureController {
read_window_updater: Sender<usize>,
preferred_read_window_size: usize,
min_read_window_size: usize,
Expand All @@ -49,7 +48,7 @@ pub struct BackpressureController<Client: ObjectClient> {
/// data up to this offset *exclusively*.
request_end_offset: u64,
read_part_size: usize,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
}

#[derive(Debug)]
Expand All @@ -75,10 +74,10 @@ pub struct BackpressureLimiter {
/// [BackpressureController] will be given to the consumer side of the object stream.
/// It can be used anywhere to set preferred read window size for the stream and tell the
/// producer when its read window should be increased.
pub fn new_backpressure_controller<Client: ObjectClient>(
pub fn new_backpressure_controller(
config: BackpressureConfig,
mem_limiter: Arc<MemoryLimiter<Client>>,
) -> (BackpressureController<Client>, BackpressureLimiter) {
mem_limiter: Arc<MemoryLimiter>,
) -> (BackpressureController, BackpressureLimiter) {
let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64;
let (read_window_updater, read_window_incrementing_queue) = unbounded();
mem_limiter.reserve(config.initial_read_window_size as u64);
Expand All @@ -102,7 +101,7 @@ pub fn new_backpressure_controller<Client: ObjectClient>(
(controller, limiter)
}

impl<Client: ObjectClient> BackpressureController<Client> {
impl BackpressureController {
pub fn read_window_end_offset(&self) -> u64 {
self.read_window_end_offset
}
Expand Down Expand Up @@ -283,7 +282,7 @@ impl<Client: ObjectClient> BackpressureController<Client> {
}
}

impl<Client: ObjectClient> Drop for BackpressureController<Client> {
impl Drop for BackpressureController {
fn drop(&mut self) {
// When approaching request end we have less memory still reserved than `self.preferred_read_window_size`.
debug_assert!(self.request_end_offset >= self.next_read_offset);
Expand Down
Loading

0 comments on commit ac9bd79

Please sign in to comment.