Skip to content

Commit

Permalink
Scale up atomically, scale down after data was consumed
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Volodkin <vladvolodkin@gmail.com>
  • Loading branch information
vladem committed Sep 5, 2024
1 parent a193202 commit 84126f4
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 84 deletions.
71 changes: 37 additions & 34 deletions mountpoint-s3/src/mem_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ use mountpoint_s3_client::ObjectClient;
pub struct MemoryLimiter<Client: ObjectClient> {
client: Arc<Client>,
mem_limit: u64,
/// Actual allocated memory for data in the part queue
prefetcher_mem_used: AtomicU64,
/// Reserved memory for data we have requested via the request task but may not
/// arrives yet.
/// Reserved memory for data we had requested via the request task but may not
/// arrived yet.
prefetcher_mem_reserved: AtomicU64,
/// Additional reserved memory for other non-buffer usage like storing metadata
additional_mem_reserved: u64,
Expand All @@ -32,59 +30,65 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
Self {
client,
mem_limit,
prefetcher_mem_used: AtomicU64::new(0),
prefetcher_mem_reserved: AtomicU64::new(0),
additional_mem_reserved: reserved_mem,
}
}

/// Commit the actual memory used. We only record data from the prefetcher for now.
pub fn allocate(&self, size: u64) {
self.prefetcher_mem_used.fetch_add(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_in_queue").increment(size as f64);
}

/// Free the actual memory used.
pub fn free(&self, size: u64) {
self.prefetcher_mem_used.fetch_sub(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_in_queue").decrement(size as f64);
}

/// Reserve the memory for future uses.
/// Reserve the memory for future uses. Always succeeds, even if it means going beyond
/// the configured memory limit.
pub fn reserve(&self, size: u64) {
self.prefetcher_mem_reserved.fetch_add(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
}

/// Reserve the memory for future uses. If there is not enough memory returns `false`.
pub fn try_reserve(&self, size: u64, min_available: u64) -> bool {
loop {
let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
let new_prefetcher_mem_reserved = prefetcher_mem_reserved.saturating_add(size);
let total_mem_usage = prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved);
let new_total_mem_usage = new_prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved);
if new_total_mem_usage > self.mem_limit - min_available {
debug!(
"not enough memory to reserve, current usage: {}, new (if scaled up): {}, allowed diff: {}",
total_mem_usage, new_total_mem_usage, min_available,
);
return false;
}
match self.prefetcher_mem_reserved.compare_exchange_weak(
prefetcher_mem_reserved,
new_prefetcher_mem_reserved,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
return true;
}
Err(_) => continue, // another thread updated the atomic before us, trying again
}
}
}

/// Release the reserved memory.
pub fn release(&self, size: u64) {
self.prefetcher_mem_reserved.fetch_sub(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_reserved").decrement(size as f64);
}

pub fn available_mem(&self) -> u64 {
let fs_mem_usage = self
.prefetcher_mem_used
.load(Ordering::SeqCst)
.max(self.prefetcher_mem_reserved.load(Ordering::SeqCst));
let mut available_mem = self
.mem_limit
let fs_mem_usage = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
self.mem_limit
.saturating_sub(fs_mem_usage)
.saturating_sub(self.additional_mem_reserved);
if let Some(client_stats) = self.client.mem_usage_stats() {
let client_mem_usage = client_stats.mem_used.max(client_stats.mem_reserved);
available_mem = available_mem.saturating_sub(client_mem_usage);
}
available_mem
.saturating_sub(self.additional_mem_reserved)
}

pub fn log_total_usage(&self) {
let formatter = make_format(humansize::BINARY);
let prefetcher_mem_used = self.prefetcher_mem_used.load(Ordering::SeqCst);
let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);

let effective_mem_used = prefetcher_mem_used.max(prefetcher_mem_reserved);
let mut total_usage = effective_mem_used.saturating_add(self.additional_mem_reserved);
let mut total_usage = prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved);
if let Some(client_stats) = self.client.mem_usage_stats() {
let effective_client_mem_usage = client_stats.mem_used.max(client_stats.mem_reserved);
total_usage = total_usage.saturating_add(effective_client_mem_usage);
Expand All @@ -93,7 +97,6 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
total_usage = formatter(total_usage),
client_mem_used = formatter(client_stats.mem_used),
client_mem_reserved = formatter(client_stats.mem_reserved),
prefetcher_mem_used = formatter(prefetcher_mem_used),
prefetcher_mem_reserved = formatter(prefetcher_mem_reserved),
additional_mem_reserved = formatter(self.additional_mem_reserved),
"total memory usage"
Expand Down
Loading

0 comments on commit 84126f4

Please sign in to comment.