Skip to content

Commit

Permalink
Remove Client from MemoryLimiter, document this structure
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Volodkin <vladvolodkin@gmail.com>
  • Loading branch information
vladem committed Sep 6, 2024
1 parent 8231d2d commit 1e5a942
Show file tree
Hide file tree
Showing 14 changed files with 95 additions and 141 deletions.
8 changes: 2 additions & 6 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use pin_project::pin_project;
use crate::object_client::{
DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult,
MemoryUsageStats, ObjectAttribute, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams,
PutObjectRequest, PutObjectResult, UploadReview,
ObjectAttribute, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest,
PutObjectResult, UploadReview,
};
use crate::ObjectClient;

Expand Down Expand Up @@ -85,10 +85,6 @@ where
self.client.initial_read_window_size()
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
self.client.mem_usage_stats()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
10 changes: 3 additions & 7 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use crate::object_client::{
Checksum, ChecksumAlgorithm, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError,
GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError,
HeadObjectResult, ListObjectsError, ListObjectsResult, MemoryUsageStats, ObjectAttribute, ObjectClient,
ObjectClientError, ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest,
PutObjectResult, PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError,
ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult,
PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
};

mod leaky_bucket;
Expand Down Expand Up @@ -572,10 +572,6 @@ impl ObjectClient for MockClient {
}
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
None
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
6 changes: 1 addition & 5 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::mock_client::{MockClient, MockClientConfig, MockClientError, MockObje
use crate::object_client::{
DeleteObjectError, DeleteObjectResult, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult,
MemoryUsageStats, ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams,
ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams,
};
use crate::types::ETag;

Expand Down Expand Up @@ -113,10 +113,6 @@ impl ObjectClient for ThroughputMockClient {
self.inner.initial_read_window_size()
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
self.inner.mem_usage_stats()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
14 changes: 0 additions & 14 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@ impl FromStr for ETag {
}
}

/// Memory usage stats for the client
pub struct MemoryUsageStats {
/// Reserved memory for the client. For [S3CrtClient], this value is a sum of primary storage
/// and secondary storage reserved memory.
pub mem_reserved: u64,
/// Actual used memory for the client. For [S3CrtClient], this value is a sum of primanry
/// storage and secondary storage used memory.
pub mem_used: u64,
}

/// A generic interface to S3-like object storage services.
///
/// This trait defines the common methods that all object services implement.
Expand All @@ -99,10 +89,6 @@ pub trait ObjectClient {
/// This can be `None` if backpressure is disabled.
fn initial_read_window_size(&self) -> Option<usize>;

/// Query current memory usage stats for the client. This can be `None` if the client
/// does not record the stats.
fn mem_usage_stats(&self) -> Option<MemoryUsageStats>;

/// Delete a single object from the object store.
///
/// DeleteObject will succeed even if the object within the bucket does not exist.
Expand Down
7 changes: 0 additions & 7 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1207,13 +1207,6 @@ impl ObjectClient for S3CrtClient {
}
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats();
let mem_reserved = crt_buffer_pool_stats.primary_reserved + crt_buffer_pool_stats.secondary_reserved;
let mem_used = crt_buffer_pool_stats.primary_used + crt_buffer_pool_stats.secondary_used;
Some(MemoryUsageStats { mem_reserved, mem_used })
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn main() {
config = config.part_size(part_size);
}
let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client"));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024));

for i in 0..iterations.unwrap_or(1) {
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ where
{
config: S3FilesystemConfig,
client: Arc<Client>,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
superblock: Superblock,
prefetcher: Prefetcher,
uploader: Uploader<Client>,
Expand Down Expand Up @@ -569,7 +569,7 @@ where
let superblock = Superblock::new(bucket, prefix, superblock_config);

let client = Arc::new(client);
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit));
let mem_limiter = Arc::new(MemoryLimiter::new(config.mem_limit));

let uploader = Uploader::new(
client.clone(),
Expand Down
59 changes: 31 additions & 28 deletions mountpoint-s3/src/mem_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
use std::sync::{atomic::Ordering, Arc};
use std::sync::atomic::Ordering;

use humansize::make_format;
use metrics::atomics::AtomicU64;
use tracing::debug;

use mountpoint_s3_client::ObjectClient;

/// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted.
/// Currently the only metric which we take into account is the memory reserved by prefetcher instances for the data requested or
/// fetched from CRT client. Single instance of this struct is shared among all of the prefetchers (file handles).
///
/// Each file handle upon creation makes an initial reservation request with a minimal read window size of `1MiB + 128KiB`. This
/// is accepted unconditionally since we want to allow any file handle to make progress even if that means going over the memory
/// limit. Additional reservations for a file handle arise when data is being read from fuse **faster** than it arrives from the
/// client (PartQueueStall). Those reservations may be rejected if there is no available memory.
///
/// Release of the reserved memory happens on one of the following events:
/// 1) prefetcher is destroyed (`PartQueue` holding the data should be dropped and the CRT request cancelled before this release)
/// 2) prefetcher's read window is scaled down (we wait for the previously requested data to be consumed)
/// 3) prefetcher is approaching the end of the request, in which case we can be sure that reservation in full won't be needed.
///
/// Following is the visualisation of a single prefetcher instance's data stream:
///
/// backwards_seek_start next_read_offset in_part_queue window_end_offset preferred_window_end_offset
/// │ │ │ │ │
/// ─┼────────────────────┼───────────────────────────┼───────────────────────────────┼────────────────────────────┼───────────-►
/// │ ├───────────────────────────┤ │ │
/// └────────────────────┤ certainly used memory └───────────────────────────────┤ │
/// memory not accounted │ in CRT buffer, or callback queue └────────────────────────────┤
/// │ (usage may be less than reserved) will be used after the │
/// │ window increase │
/// └────────────────────────────────────────────────────────────────────────────────────────┘
/// preferred_read_window_size (reserved in MemoryLimiter)
///
#[derive(Debug)]
pub struct MemoryLimiter<Client: ObjectClient> {
client: Arc<Client>,
pub struct MemoryLimiter {
mem_limit: u64,
/// Reserved memory for data we had requested via the request task but may not
/// arrived yet.
Expand All @@ -17,8 +41,8 @@ pub struct MemoryLimiter<Client: ObjectClient> {
additional_mem_reserved: u64,
}

impl<Client: ObjectClient> MemoryLimiter<Client> {
pub fn new(client: Arc<Client>, mem_limit: u64) -> Self {
impl MemoryLimiter {
pub fn new(mem_limit: u64) -> Self {
let min_reserved = 128 * 1024 * 1024;
let reserved_mem = (mem_limit / 8).max(min_reserved);
let formatter = make_format(humansize::BINARY);
Expand All @@ -28,7 +52,6 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
formatter(reserved_mem)
);
Self {
client,
mem_limit,
prefetcher_mem_reserved: AtomicU64::new(0),
additional_mem_reserved: reserved_mem,
Expand Down Expand Up @@ -83,24 +106,4 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
.saturating_sub(fs_mem_usage)
.saturating_sub(self.additional_mem_reserved)
}

pub fn log_total_usage(&self) {
let formatter = make_format(humansize::BINARY);
let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);

let mut total_usage = prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved);
if let Some(client_stats) = self.client.mem_usage_stats() {
let effective_client_mem_usage = client_stats.mem_used.max(client_stats.mem_reserved);
total_usage = total_usage.saturating_add(effective_client_mem_usage);

debug!(
total_usage = formatter(total_usage),
client_mem_used = formatter(client_stats.mem_used),
client_mem_reserved = formatter(client_stats.mem_reserved),
prefetcher_mem_reserved = formatter(prefetcher_mem_reserved),
additional_mem_reserved = formatter(self.additional_mem_reserved),
"total memory usage"
);
}
}
}
31 changes: 15 additions & 16 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub trait Prefetch {
fn prefetch<Client>(
&self,
client: Arc<Client>,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
bucket: String,
object_id: ObjectId,
size: u64,
Expand Down Expand Up @@ -203,7 +203,7 @@ where
fn prefetch<Client>(
&self,
client: Arc<Client>,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
bucket: String,
object_id: ObjectId,
size: u64,
Expand All @@ -229,9 +229,9 @@ where
pub struct PrefetchGetObject<Stream: ObjectPartStream, Client: ObjectClient> {
client: Arc<Client>,
part_stream: Arc<Stream>,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
config: PrefetcherConfig,
backpressure_task: Option<RequestTask<Client::ClientError, Client>>,
backpressure_task: Option<RequestTask<Client::ClientError>>,
// Invariant: the offset of the last byte in this window is always
// self.next_sequential_read_offset - 1.
backward_seek_window: SeekWindow,
Expand Down Expand Up @@ -283,7 +283,7 @@ where
fn new(
client: Arc<Client>,
part_stream: Arc<Stream>,
mem_limiter: Arc<MemoryLimiter<Client>>,
mem_limiter: Arc<MemoryLimiter>,
config: PrefetcherConfig,
bucket: String,
object_id: ObjectId,
Expand Down Expand Up @@ -384,7 +384,7 @@ where
/// We will be using flow-control window to control how much data we want to download into the prefetcher.
fn spawn_read_backpressure_request(
&mut self,
) -> Result<RequestTask<Client::ClientError, Client>, PrefetchReadError<Client::ClientError>> {
) -> Result<RequestTask<Client::ClientError>, PrefetchReadError<Client::ClientError>> {
let start = self.next_sequential_read_offset;
let object_size = self.size as usize;
let read_part_size = self.client.read_part_size().unwrap_or(8 * 1024 * 1024);
Expand Down Expand Up @@ -511,7 +511,6 @@ impl<Stream: ObjectPartStream, Client: ObjectClient> PrefetchGetObject<Stream, C
impl<Stream: ObjectPartStream, Client: ObjectClient> Drop for PrefetchGetObject<Stream, Client> {
fn drop(&mut self) {
self.record_contiguous_read_metric();
self.mem_limiter.log_total_usage();
}
}

Expand Down Expand Up @@ -580,7 +579,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);
let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -675,7 +674,7 @@ mod tests {
Stream: ObjectPartStream + Send + Sync + 'static,
{
let client = Arc::new(MockClient::new(client_config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);
let read_size = 1 * MB;
let object_size = 8 * MB;
let object = MockObject::ramp(0xaa, object_size, ETag::for_tests());
Expand Down Expand Up @@ -782,7 +781,7 @@ mod tests {
HashMap::new(),
HashMap::new(),
));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);

let prefetcher_config = PrefetcherConfig {
max_read_window_size: test_config.max_read_window_size,
Expand Down Expand Up @@ -907,7 +906,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -1091,7 +1090,7 @@ mod tests {
HashMap::new(),
HashMap::new(),
));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);

let prefetcher = Prefetcher::new(default_stream(), Default::default());
let mem_limiter = Arc::new(mem_limiter);
Expand Down Expand Up @@ -1144,7 +1143,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024));
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -1186,7 +1185,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024));
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();

Expand Down Expand Up @@ -1248,7 +1247,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let file_etag = object.etag();

Expand Down Expand Up @@ -1314,7 +1313,7 @@ mod tests {
..Default::default()
};
let client = Arc::new(MockClient::new(config));
let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024);
let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let file_etag = object.etag();

Expand Down
Loading

0 comments on commit 1e5a942

Please sign in to comment.