Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust read window based on used memory #991

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mountpoint-s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tracing = { version = "0.1.35", features = ["log"] }
tracing-log = "0.2.0"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
async-stream = "0.3.5"
humansize = "2.1.3"

[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16.0", default-features = false }
Expand Down
6 changes: 5 additions & 1 deletion mountpoint-s3/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::time::Instant;

use clap::{Arg, Command};
use futures::executor::{block_on, ThreadPool};
use mountpoint_s3::mem_limiter::MemoryLimiter;
use mountpoint_s3::object::ObjectId;
use mountpoint_s3::prefetch::{default_prefetch, Prefetch, PrefetchResult};
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::ETag;
Expand Down Expand Up @@ -77,6 +79,7 @@ fn main() {
config = config.part_size(part_size);
}
let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client"));
let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024));

for i in 0..iterations.unwrap_or(1) {
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
Expand All @@ -85,7 +88,8 @@ fn main() {

let start = Instant::now();

let mut request = manager.prefetch(client.clone(), bucket, key, size, ETag::for_tests());
let object_id = ObjectId::new(key.clone(), ETag::for_tests());
let mut request = manager.prefetch(client.clone(), mem_limiter.clone(), bucket.clone(), object_id, size);
block_on(async {
loop {
let offset = received_size.load(Ordering::SeqCst);
Expand Down
22 changes: 21 additions & 1 deletion mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use nix::sys::signal::Signal;
use nix::unistd::ForkResult;
use regex::Regex;
use sysinfo::{RefreshKind, System};

use crate::build_info;
use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ManagedCacheDir};
Expand Down Expand Up @@ -157,6 +158,15 @@ pub struct CliArgs {
)]
pub max_threads: u64,

#[clap(
long,
help = "Maximum memory usage target [default: 95% of total system memory with a minimum of 512 MiB]",
value_name = "MiB",
value_parser = value_parser!(u64).range(512..),
help_heading = CLIENT_OPTIONS_HEADER
)]
pub max_memory_target: Option<u64>,

#[clap(
long,
help = "Part size for multi-part GET and PUT in bytes",
Expand Down Expand Up @@ -393,12 +403,14 @@ impl CliArgs {
let mut filter = if self.debug {
String::from("debug")
} else {
String::from("warn")
String::from("info")
};
let crt_verbosity = if self.debug_crt { "debug" } else { "off" };
filter.push_str(&format!(",{}={}", AWSCRT_LOG_TARGET, crt_verbosity));
if self.log_metrics {
filter.push_str(&format!(",{}=info", metrics::TARGET_NAME));
} else {
filter.push_str(&format!(",{}=off", metrics::TARGET_NAME));
}
filter
};
Expand Down Expand Up @@ -738,6 +750,14 @@ where
filesystem_config.allow_overwrite = args.allow_overwrite;
filesystem_config.s3_personality = s3_personality;
filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse, args.sse_kms_key_id);
filesystem_config.mem_limit = if let Some(max_mem_target) = args.max_memory_target {
max_mem_target * 1024 * 1024
} else {
const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024;
let sys = System::new_with_specifics(RefreshKind::everything());
let default_mem_target = (sys.total_memory() as f64 * 0.95) as u64;
default_mem_target.max(MINIMUM_MEM_LIMIT)
};

// Written in this awkward way to force us to update it if we add new checksum types
filesystem_config.use_upload_checksums = match args.upload_checksums {
Expand Down
19 changes: 16 additions & 3 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use crate::inode::{
Inode, InodeError, InodeKind, LookedUp, ReadHandle, ReaddirHandle, Superblock, SuperblockConfig, WriteHandle,
};
use crate::logging;
use crate::mem_limiter::MemoryLimiter;
use crate::object::ObjectId;
use crate::prefetch::{Prefetch, PrefetchResult};
use crate::prefix::Prefix;
use crate::s3::S3Personality;
Expand Down Expand Up @@ -151,9 +153,14 @@ where
None => return Err(err!(libc::EBADF, "no E-Tag for inode {}", lookup.inode.ino())),
Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"),
};
let request = fs
.prefetcher
.prefetch(fs.client.clone(), &fs.bucket, &full_key, object_size, etag.clone());
let object_id = ObjectId::new(full_key, etag);
let request = fs.prefetcher.prefetch(
fs.client.clone(),
fs.mem_limiter.clone(),
fs.bucket.clone(),
object_id,
object_size,
);
let handle = FileHandleState::Read { handle, request };
metrics::gauge!("fs.current_handles", "type" => "read").increment(1.0);
Ok(handle)
Expand Down Expand Up @@ -393,6 +400,8 @@ pub struct S3FilesystemConfig {
pub server_side_encryption: ServerSideEncryption,
/// Use additional checksums for uploads
pub use_upload_checksums: bool,
/// Memory limit
pub mem_limit: u64,
}

impl Default for S3FilesystemConfig {
Expand All @@ -413,6 +422,7 @@ impl Default for S3FilesystemConfig {
s3_personality: S3Personality::default(),
server_side_encryption: Default::default(),
use_upload_checksums: true,
mem_limit: 512 * 1024 * 1024,
}
}
}
Expand Down Expand Up @@ -526,6 +536,7 @@ where
{
config: S3FilesystemConfig,
client: Arc<Client>,
mem_limiter: Arc<MemoryLimiter>,
superblock: Superblock,
prefetcher: Prefetcher,
uploader: Uploader<Client>,
Expand Down Expand Up @@ -558,6 +569,7 @@ where
let superblock = Superblock::new(bucket, prefix, superblock_config);

let client = Arc::new(client);
let mem_limiter = Arc::new(MemoryLimiter::new(config.mem_limit));

let uploader = Uploader::new(
client.clone(),
Expand All @@ -569,6 +581,7 @@ where
Self {
config,
client,
mem_limiter,
superblock,
prefetcher,
uploader,
Expand Down
3 changes: 2 additions & 1 deletion mountpoint-s3/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ pub mod fs;
pub mod fuse;
mod inode;
pub mod logging;
pub mod mem_limiter;
pub mod metrics;
mod object;
pub mod object;
pub mod prefetch;
pub mod prefix;
pub mod s3;
Expand Down
109 changes: 109 additions & 0 deletions mountpoint-s3/src/mem_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use std::sync::atomic::Ordering;

use humansize::make_format;
use metrics::atomics::AtomicU64;
use tracing::debug;

/// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted.
/// Currently the only metric which we take into account is the memory reserved by prefetcher instances for the data requested or
/// fetched from CRT client. Single instance of this struct is shared among all of the prefetchers (file handles).
///
/// Each file handle upon creation makes an initial reservation request with a minimal read window size of `1MiB + 128KiB`. This
/// is accepted unconditionally since we want to allow any file handle to make progress even if that means going over the memory
/// limit. Additional reservations for a file handle arise when data is being read from fuse **faster** than it arrives from the
/// client (PartQueueStall). Those reservations may be rejected if there is no available memory.
///
/// Release of the reserved memory happens on one of the following events:
/// 1) prefetcher is destroyed (`PartQueue` holding the data should be dropped and the CRT request cancelled before this release)
/// 2) prefetcher's read window is scaled down (we wait for the previously requested data to be consumed)
/// 3) prefetcher is approaching the end of the request, in which case we can be sure that reservation in full won't be needed.
///
/// Following is the visualisation of a single prefetcher instance's data stream:
///
/// backwards_seek_start next_read_offset in_part_queue window_end_offset preferred_window_end_offset
/// │ │ │ │ │
/// ─┼────────────────────┼───────────────────────────┼───────────────────────────────┼────────────────────────────┼───────────-►
/// │ ├───────────────────────────┤ │ │
/// └────────────────────┤ certainly used memory └───────────────────────────────┤ │
/// memory not accounted │ in CRT buffer, or callback queue └────────────────────────────┤
/// │ (usage may be less than reserved) will be used after the │
/// │ window increase │
/// └────────────────────────────────────────────────────────────────────────────────────────┘
/// preferred_read_window_size (reserved in MemoryLimiter)
///
#[derive(Debug)]
pub struct MemoryLimiter {
mem_limit: u64,
/// Reserved memory for data we had requested via the request task but may not
/// arrived yet.
prefetcher_mem_reserved: AtomicU64,
/// Additional reserved memory for other non-buffer usage like storing metadata
additional_mem_reserved: u64,
}

impl MemoryLimiter {
pub fn new(mem_limit: u64) -> Self {
let min_reserved = 128 * 1024 * 1024;
let reserved_mem = (mem_limit / 8).max(min_reserved);
let formatter = make_format(humansize::BINARY);
debug!(
"target memory usage is {} with {} reserved memory",
formatter(mem_limit),
formatter(reserved_mem)
);
Self {
mem_limit,
prefetcher_mem_reserved: AtomicU64::new(0),
additional_mem_reserved: reserved_mem,
}
}

/// Reserve the memory for future uses. Always succeeds, even if it means going beyond
/// the configured memory limit.
pub fn reserve(&self, size: u64) {
self.prefetcher_mem_reserved.fetch_add(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
}

/// Reserve the memory for future uses. If there is not enough memory returns `false`.
pub fn try_reserve(&self, size: u64, min_available: u64) -> bool {
loop {
let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
let new_prefetcher_mem_reserved = prefetcher_mem_reserved.saturating_add(size);
let total_mem_usage = prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved);
let new_total_mem_usage = new_prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved);
if new_total_mem_usage > self.mem_limit - min_available {
debug!(
"not enough memory to reserve, current usage: {}, new (if scaled up): {}, allowed diff: {}",
total_mem_usage, new_total_mem_usage, min_available,
);
return false;
}
match self.prefetcher_mem_reserved.compare_exchange_weak(
prefetcher_mem_reserved,
new_prefetcher_mem_reserved,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
return true;
}
Err(_) => continue, // another thread updated the atomic before us, trying again
}
}
}

/// Release the reserved memory.
pub fn release(&self, size: u64) {
self.prefetcher_mem_reserved.fetch_sub(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_reserved").decrement(size as f64);
}

pub fn available_mem(&self) -> u64 {
let fs_mem_usage = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
self.mem_limit
.saturating_sub(fs_mem_usage)
.saturating_sub(self.additional_mem_reserved)
}
}
Loading
Loading