Skip to content

Commit

Permalink
Disallow prefetcher_mem_reserved going over the limit, scale down in …
Browse files Browse the repository at this point in the history
…2 steps

Signed-off-by: Vladislav Volodkin <vladvolodkin@gmail.com>
  • Loading branch information
vladem committed Sep 3, 2024
1 parent b0ad94d commit 857d040
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 79 deletions.
69 changes: 35 additions & 34 deletions mountpoint-s3/src/mem_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ use mountpoint_s3_client::ObjectClient;
pub struct MemoryLimiter<Client: ObjectClient> {
client: Arc<Client>,
mem_limit: u64,
/// Actual allocated memory for data in the part queue
prefetcher_mem_used: AtomicU64,
/// Reserved memory for data we have requested via the request task but may not
/// arrives yet.
/// Reserved memory for data we had requested via the request task but may not
/// arrived yet.
prefetcher_mem_reserved: AtomicU64,
/// Additional reserved memory for other non-buffer usage like storing metadata
additional_mem_reserved: u64,
Expand All @@ -32,59 +30,63 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
Self {
client,
mem_limit,
prefetcher_mem_used: AtomicU64::new(0),
prefetcher_mem_reserved: AtomicU64::new(0),
additional_mem_reserved: reserved_mem,
}
}

/// Commit the actual memory used. We only record data from the prefetcher for now.
pub fn allocate(&self, size: u64) {
self.prefetcher_mem_used.fetch_add(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_in_queue").increment(size as f64);
}

/// Free the actual memory used.
pub fn free(&self, size: u64) {
self.prefetcher_mem_used.fetch_sub(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_in_queue").decrement(size as f64);
}

/// Reserve the memory for future uses.
/// Reserve the memory for future uses. Always succeeds, even if it means going beyond
/// the configured memory limit.
pub fn reserve(&self, size: u64) {
self.prefetcher_mem_reserved.fetch_add(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
}

/// Reserve the memory for future uses. If there is not enough memory returns `false`.
pub fn try_reserve(&self, size: u64) -> bool {
loop {
let fs_mem_usage = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
let new_usage = fs_mem_usage.saturating_add(size);
if new_usage > self.mem_limit {
debug!(
"not enough memory to reserve, current usage: {}, new (if scaled up): {}",
fs_mem_usage, new_usage,
);
return false;
}
match self.prefetcher_mem_reserved.compare_exchange_weak(
fs_mem_usage,
new_usage,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
return true;
}
Err(_) => continue, // another thread updated the atomic before us, trying again
}
}
}

/// Release the reserved memory.
pub fn release(&self, size: u64) {
self.prefetcher_mem_reserved.fetch_sub(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_reserved").decrement(size as f64);
}

pub fn available_mem(&self) -> u64 {
let fs_mem_usage = self
.prefetcher_mem_used
.load(Ordering::SeqCst)
.max(self.prefetcher_mem_reserved.load(Ordering::SeqCst));
let mut available_mem = self
.mem_limit
let fs_mem_usage = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
self.mem_limit
.saturating_sub(fs_mem_usage)
.saturating_sub(self.additional_mem_reserved);
if let Some(client_stats) = self.client.mem_usage_stats() {
let client_mem_usage = client_stats.mem_used.max(client_stats.mem_reserved);
available_mem = available_mem.saturating_sub(client_mem_usage);
}
available_mem
.saturating_sub(self.additional_mem_reserved)
}

pub fn log_total_usage(&self) {
let formatter = make_format(humansize::BINARY);
let prefetcher_mem_used = self.prefetcher_mem_used.load(Ordering::SeqCst);
let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);

let effective_mem_used = prefetcher_mem_used.max(prefetcher_mem_reserved);
let mut total_usage = effective_mem_used.saturating_add(self.additional_mem_reserved);
let mut total_usage = prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved);
if let Some(client_stats) = self.client.mem_usage_stats() {
let effective_client_mem_usage = client_stats.mem_used.max(client_stats.mem_reserved);
total_usage = total_usage.saturating_add(effective_client_mem_usage);
Expand All @@ -93,7 +95,6 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
total_usage = formatter(total_usage),
client_mem_used = formatter(client_stats.mem_used),
client_mem_reserved = formatter(client_stats.mem_reserved),
prefetcher_mem_used = formatter(prefetcher_mem_used),
prefetcher_mem_reserved = formatter(prefetcher_mem_reserved),
additional_mem_reserved = formatter(self.additional_mem_reserved),
"total memory usage"
Expand Down
150 changes: 113 additions & 37 deletions mountpoint-s3/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,34 +112,66 @@ impl<Client: ObjectClient> BackpressureController<Client> {
pub async fn send_feedback<E>(&mut self, event: BackpressureFeedbackEvent) -> Result<(), PrefetchReadError<E>> {
match event {
BackpressureFeedbackEvent::DataRead(length) => {
// Step 2. of scale down, including the case when we're approaching the request end. See `self.scale_down` for the logic.
self.next_read_offset += length as u64;
if self.next_read_offset >= self.request_end_offset {
self.next_read_offset = self.request_end_offset;
}

let preffered_window_end_offset = self
.next_read_offset
.saturating_add(self.preferred_read_window_size as u64);
let over_reserved = self.read_window_end_offset.saturating_sub(preffered_window_end_offset);
if over_reserved > 0 {
self.mem_limiter.release((length as u64).min(over_reserved));
}
if self.request_end_offset < preffered_window_end_offset {
// We won't need the full `preffered_window_end_offset` as we're approaching the request's end.
self.mem_limiter.release(length as u64);
}

// Increment the read window only if the remaining window reaches some threshold i.e. half of it left.
while self.remaining_window() < (self.preferred_read_window_size / 2)
if self.remaining_window() < (self.preferred_read_window_size / 2)
&& self.read_window_end_offset < self.request_end_offset
{
// If there is not enough available memory in the system, we'll try to reduce the read window of the current request.
// We define "not enough memory" as a situation where no new request with a minimum window may fit in the limit.
//
// Scaling down is best effort, meaning that there is no guarantee that after this action such a
// request will fit in memory. This may not be the case if during the scale down a new memory reservation was made by
// another request.
//
// We reduce the frequency of scale downs by only performing it when sufficient amount of data (half of read_window)
// was read.
let available_mem = self.mem_limiter.available_mem();
// If the preferred read window size is still large and available memory is getting low we will try to scale it down.
if self.preferred_read_window_size > self.min_read_window_size
&& available_mem < self.preferred_read_window_size as u64
let mut new_read_window_size = self.preferred_read_window_size; // new_preferred_read_window_size is just too wordy
while new_read_window_size > self.min_read_window_size
&& available_mem < self.min_read_window_size as u64
&& self.read_window_size_multiplier > 1
{
self.try_scaling_down();
continue;
new_read_window_size /= self.read_window_size_multiplier;
}
new_read_window_size = new_read_window_size.max(self.min_read_window_size);
if new_read_window_size != self.preferred_read_window_size {
self.scale_down(new_read_window_size);
}

let new_read_window_end_offset = self
.next_read_offset
.saturating_add(self.preferred_read_window_size as u64)
.min(self.request_end_offset);
debug_assert!(self.read_window_end_offset < new_read_window_end_offset);
let to_increase = new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize;
trace!(
preferred_read_window_size = self.preferred_read_window_size,
next_read_offset = self.next_read_offset,
read_window_end_offset = self.read_window_end_offset,
to_increase,
"incrementing read window"
);
self.increment_read_window(to_increase).await;
if self.read_window_end_offset < new_read_window_end_offset {
let to_increase =
new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize;
trace!(
preferred_read_window_size = self.preferred_read_window_size,
next_read_offset = self.next_read_offset,
read_window_end_offset = self.read_window_end_offset,
to_increase,
"incrementing read window"
);
self.increment_read_window(to_increase).await;
}
self.read_window_end_offset = new_read_window_end_offset;
}
}
Expand All @@ -163,6 +195,7 @@ impl<Client: ObjectClient> BackpressureController<Client> {
}

// Try scaling up preferred read window size with a multiplier configured at initialization.
// Scaling up fails silently if there is no enough free memory to perform it.
fn try_scaling_up(&mut self) {
if self.preferred_read_window_size < self.max_read_window_size {
let new_read_window_size = self.preferred_read_window_size * self.read_window_size_multiplier;
Expand All @@ -172,43 +205,86 @@ impl<Client: ObjectClient> BackpressureController<Client> {

// Only scale up when there is enough memory
let to_increase = (new_read_window_size - self.preferred_read_window_size) as u64;
if to_increase <= self.mem_limiter.available_mem() {
if self.mem_limiter.try_reserve(to_increase) {
let formatter = make_format(humansize::BINARY);
debug!(
current_size = formatter(self.preferred_read_window_size),
new_size = formatter(new_read_window_size),
"scaling up preferred read window"
"scaled up preferred read window"
);
self.mem_limiter.release(self.preferred_read_window_size as u64);
self.mem_limiter.reserve(new_read_window_size as u64);
self.preferred_read_window_size = new_read_window_size;
metrics::histogram!("prefetch.window_after_increase_mib")
.record((self.preferred_read_window_size / 1024 / 1024) as f64);
}
}
}

pub fn try_scaling_down(&mut self) {
if self.preferred_read_window_size > self.min_read_window_size {
let new_read_window_size = self.preferred_read_window_size / self.read_window_size_multiplier;
// Also align the new read window size to the client part size
let new_read_window_size =
align(new_read_window_size, self.read_part_size, false).max(self.min_read_window_size);
pub fn scale_down(&mut self, new_read_window_size: usize) {
/*
Scaling down is performed in 2 steps, one in this method and another on read. Note that `window_end_offset` is the value
which is set in CRT and it may not be decreased. This function implements step 1.
let formatter = make_format(humansize::BINARY);
debug!(
current_size = formatter(self.preferred_read_window_size),
new_size = formatter(new_read_window_size),
"scaling down read window"
);
self.mem_limiter.release(self.preferred_read_window_size as u64);
self.mem_limiter.reserve(new_read_window_size as u64);
self.preferred_read_window_size = new_read_window_size;
}
0. Before scale down:
read_until window_end_offset preferred_window_end_offset
│ │ │
────┼───────────────────────────────────────────────────────────┼───────────────┼─────────────────────────────────►
│ │
└───────────────────────────────────────────────────────────────────────────┘
preferred_read_window_size
1. Scaling down (`new_read_window_size` is applied):
read_until preferred_window_end_offset window_end_offset preferred_window_end_offset_old
│ │ │ │
────┼──────────────────────────────────────┼────────────────────┼───────────────┼─────────────────────────────────►
│ ├───────────────┘
└────────────────────┘ released immediatelly
over_reserved
2. Part read:
read_until(old) read_until preferred_window_end_offset window_end_offset
│ │ │ │
────┼────────────┼─────────────────────────────────────┼────────┼─────────────────────────────────────────────────►
└────────────┘ └────────┘
released on read: over_reserved (new)
1. if over_reserved > 0
2. min(part.size(), over_reserved) is to deduct
*/
// Align the new read window size to the client part size
let new_read_window_size =
align(new_read_window_size, self.read_part_size, false).max(self.min_read_window_size);

let formatter = make_format(humansize::BINARY);
debug!(
current_size = formatter(self.preferred_read_window_size),
new_size = formatter(new_read_window_size),
"scaling down read window"
);
let preferred_window_end_offset_old = self
.next_read_offset
.saturating_add(self.preferred_read_window_size as u64);
let preferred_window_end_offset = self.next_read_offset.saturating_add(new_read_window_size as u64);
// In most cases we'll keep memory reserved for `self.read_window_end_offset`, but if the new
// `preferred_window_end_offset` is greater, we'll reserve for it instead.
let reserve_until_offset = self.read_window_end_offset.max(preferred_window_end_offset);
let to_release = preferred_window_end_offset_old.saturating_sub(reserve_until_offset);
self.mem_limiter.release(to_release);
self.preferred_read_window_size = new_read_window_size;
metrics::histogram!("prefetch.window_after_decrease_mib")
.record((self.preferred_read_window_size / 1024 / 1024) as f64);
}
}

impl<Client: ObjectClient> Drop for BackpressureController<Client> {
fn drop(&mut self) {
self.mem_limiter.release(self.preferred_read_window_size as u64);
// When approaching request end we have less memory still reserved than `self.preferred_read_window_size`.
debug_assert!(self.request_end_offset >= self.next_read_offset);
let remaining_in_request = self.request_end_offset.saturating_sub(self.next_read_offset);

self.mem_limiter
.release((self.preferred_read_window_size as u64).min(remaining_in_request));
}
}

Expand Down
Loading

0 comments on commit 857d040

Please sign in to comment.