Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Monthon Klongklaew <monthonk@amazon.com>
  • Loading branch information
monthonk committed Sep 25, 2024
1 parent ab1000a commit 42108c1
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 70 deletions.
7 changes: 4 additions & 3 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use std::task::{Context, Poll};

use async_trait::async_trait;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use pin_project::pin_project;

use crate::object_client::{
DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult,
MemoryUsageStats, ObjectAttribute, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams,
PutObjectRequest, PutObjectResult, UploadReview,
ObjectAttribute, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest,
PutObjectResult, UploadReview,
};
use crate::ObjectClient;

Expand Down Expand Up @@ -85,7 +86,7 @@ where
self.client.initial_read_window_size()
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
self.client.mem_usage_stats()
}

Expand Down
9 changes: 5 additions & 4 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use async_trait::async_trait;
use futures::{Stream, StreamExt};
use lazy_static::lazy_static;
use mountpoint_s3_crt::checksums::crc32c;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
Expand All @@ -26,9 +27,9 @@ use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use crate::object_client::{
Checksum, ChecksumAlgorithm, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError,
GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError,
HeadObjectResult, ListObjectsError, ListObjectsResult, MemoryUsageStats, ObjectAttribute, ObjectClient,
ObjectClientError, ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest,
PutObjectResult, PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError,
ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult,
PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
};

mod leaky_bucket;
Expand Down Expand Up @@ -572,7 +573,7 @@ impl ObjectClient for MockClient {
}
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
None
}

Expand Down
5 changes: 3 additions & 2 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use std::time::Duration;
use async_io::block_on;
use async_trait::async_trait;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use pin_project::pin_project;

use crate::mock_client::leaky_bucket::LeakyBucket;
use crate::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, MockPutObjectRequest};
use crate::object_client::{
DeleteObjectError, DeleteObjectResult, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult,
MemoryUsageStats, ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams,
ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams,
};
use crate::types::ETag;

Expand Down Expand Up @@ -113,7 +114,7 @@ impl ObjectClient for ThroughputMockClient {
self.inner.initial_read_window_size()
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
self.inner.mem_usage_stats()
}

Expand Down
15 changes: 2 additions & 13 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use async_trait::async_trait;
use auto_impl::auto_impl;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use std::pin::Pin;
use std::str::FromStr;
use std::time::SystemTime;
Expand Down Expand Up @@ -63,18 +64,6 @@ impl FromStr for ETag {
}
}

/// Memory usage stats for the client
pub struct MemoryUsageStats {
/// Total allocated memory for the client.
pub mem_allocated: u64,
/// Reserved memory for the client. For [S3CrtClient], this value is a sum of primary storage
/// and secondary storage reserved memory.
pub mem_reserved: u64,
/// Actual used memory for the client. For [S3CrtClient], this value is a sum of primanry
/// storage and secondary storage used memory.
pub mem_used: u64,
}

/// A generic interface to S3-like object storage services.
///
/// This trait defines the common methods that all object services implement.
Expand Down Expand Up @@ -103,7 +92,7 @@ pub trait ObjectClient {

/// Query current memory usage stats for the client. This can be `None` if the client
/// does not record the stats.
fn mem_usage_stats(&self) -> Option<MemoryUsageStats>;
fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats>;

/// Delete a single object from the object store.
///
Expand Down
25 changes: 8 additions & 17 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
use mountpoint_s3_crt::s3::client::{
init_signing_config, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
MetaRequestType, RequestMetrics, RequestType,
init_signing_config, BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions,
MetaRequestResult, MetaRequestType, RequestMetrics, RequestType,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -767,7 +767,9 @@ impl S3CrtClientInner {
.set(metrics.num_requests_streaming_response as f64);

// Buffer pool metrics
let start = Instant::now();
let buffer_pool_stats = s3_client.poll_buffer_pool_usage_stats();
metrics::histogram!("s3.client.buffer_pool.get_usage_latecy_us").record(start.elapsed().as_micros() as f64);
metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
Expand Down Expand Up @@ -1208,22 +1210,11 @@ impl ObjectClient for S3CrtClient {
}
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
let start = Instant::now();
let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats();
let mem_allocated = crt_buffer_pool_stats
.primary_allocated
.saturating_add(crt_buffer_pool_stats.secondary_used);
let mem_reserved = crt_buffer_pool_stats
.primary_reserved
.saturating_add(crt_buffer_pool_stats.secondary_reserved);
let mem_used = crt_buffer_pool_stats
.primary_used
.saturating_add(crt_buffer_pool_stats.secondary_used);
Some(MemoryUsageStats {
mem_allocated,
mem_reserved,
mem_used,
})
metrics::histogram!("s3.client.buffer_pool.get_usage_latecy_us").record(start.elapsed().as_micros() as f64);
Some(crt_buffer_pool_stats)
}

async fn delete_object(
Expand Down
17 changes: 15 additions & 2 deletions mountpoint-s3/src/mem_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::{atomic::Ordering, Arc};
use std::{
sync::{atomic::Ordering, Arc},
time::Instant,
};

use humansize::make_format;
use metrics::atomics::AtomicU64;
Expand Down Expand Up @@ -75,6 +78,7 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {

/// Reserve the memory for future uses. If there is not enough memory returns `false`.
pub fn try_reserve(&self, size: u64) -> bool {
let start = Instant::now();
let mut prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
loop {
let new_prefetcher_mem_reserved = prefetcher_mem_reserved.saturating_add(size);
Expand All @@ -84,6 +88,7 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
.saturating_add(self.additional_mem_reserved);
if new_total_mem_usage > self.mem_limit {
debug!(new_total_mem_usage, "not enough memory to reserve");
metrics::histogram!("prefetch.mem_reserve_latency_us").record(start.elapsed().as_micros() as f64);
return false;
}
// Check that the value we have read is still the same before updating it
Expand All @@ -95,6 +100,7 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
) {
Ok(_) => {
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
metrics::histogram!("prefetch.mem_reserve_latency_us").record(start.elapsed().as_micros() as f64);
return true;
}
Err(current) => prefetcher_mem_reserved = current, // another thread updated the atomic before us, trying again
Expand All @@ -118,7 +124,14 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
.saturating_sub(self.additional_mem_reserved)
}

// Get allocated memory for the client. Currently, only the CRT client is able to report its buffer pool stats.
// The CRT allocates memory in two areas. The first one is primary storage where memory is allocated in blocks
// and we can get number of allocated bytes from `primary_allocated` stat. Another area is called secondary storage
// where memory is allocated exactly equal to the used memory. So total allocated memory for the CRT client would
// be `primary_allocated` + `secondary_used`.
fn client_mem_allocated(&self) -> u64 {
self.client.mem_usage_stats().map_or(0, |stats| stats.mem_allocated)
self.client
.mem_usage_stats()
.map_or(0, |stats| stats.primary_allocated.saturating_add(stats.secondary_used))
}
}
4 changes: 4 additions & 0 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,10 @@ where
trace!("seek failed: not enough data in backwards seek window");
return Ok(false);
};
// This also increase `prefetcher_mem_reserved` value in memory limiter.
// At least one subsequent `RequestTask::read` is required for memory tracking to work correctly
// because `BackpressureController::drop` needs to know the start offset of the part queue to
// release the right amount of memory.
task.push_front(parts).await?;
self.next_sequential_read_offset = offset;
Ok(true)
Expand Down
29 changes: 3 additions & 26 deletions mountpoint-s3/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub struct BackpressureConfig {
pub read_window_size_multiplier: usize,
/// Request range to apply backpressure
pub request_range: Range<u64>,
pub read_part_size: usize,
}

#[derive(Debug)]
Expand All @@ -48,7 +47,6 @@ pub struct BackpressureController<Client: ObjectClient> {
/// End offset for the request we want to apply backpressure. The request can return
/// data up to this offset *exclusively*.
request_end_offset: u64,
read_part_size: usize,
mem_limiter: Arc<MemoryLimiter<Client>>,
}

Expand Down Expand Up @@ -93,7 +91,6 @@ pub fn new_backpressure_controller<Client: ObjectClient>(
read_window_end_offset,
next_read_offset: config.request_range.start,
request_end_offset: config.request_range.end,
read_part_size: config.read_part_size,
mem_limiter,
};
let limiter = BackpressureLimiter {
Expand All @@ -120,6 +117,8 @@ impl<Client: ObjectClient> BackpressureController<Client> {
let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize;

// Increment the read window only if the remaining window reaches some threshold i.e. half of it left.
// When memory is low the `preferred_read_window_size` will be scaled down so we have to keep trying
// until we have enough read window.
while remaining_window < (self.preferred_read_window_size / 2)
&& self.read_window_end_offset < self.request_end_offset
{
Expand All @@ -146,6 +145,7 @@ impl<Client: ObjectClient> BackpressureController<Client> {
// scale down the read window if it fails.
if self.mem_limiter.try_reserve(to_increase as u64) {
self.increment_read_window(to_increase).await;
break;
} else {
self.scale_down();
}
Expand Down Expand Up @@ -180,10 +180,6 @@ impl<Client: ObjectClient> BackpressureController<Client> {
fn scale_up(&mut self) {
if self.preferred_read_window_size < self.max_read_window_size {
let new_read_window_size = self.preferred_read_window_size * self.read_window_size_multiplier;
// Also align the new read window size to the client part size
let new_read_window_size =
align(new_read_window_size, self.read_part_size, false).min(self.max_read_window_size);

// Only scale up when there is enough memory. We don't have to reserve the memory here
// because only `preferred_read_window_size` is increased but the actual read window will
// be updated later on `DataRead` event (where we do reserve memory).
Expand All @@ -208,10 +204,6 @@ impl<Client: ObjectClient> BackpressureController<Client> {
if self.preferred_read_window_size > self.min_read_window_size {
assert!(self.read_window_size_multiplier > 1);
let new_read_window_size = self.preferred_read_window_size / self.read_window_size_multiplier;
// Also align the new read window size to the client part size
let new_read_window_size =
align(new_read_window_size, self.read_part_size, false).max(self.min_read_window_size);

let formatter = make_format(humansize::BINARY);
debug!(
current_size = formatter(self.preferred_read_window_size),
Expand Down Expand Up @@ -275,18 +267,3 @@ impl BackpressureLimiter {
Ok(Some(self.read_window_end_offset))
}
}

/// Try to align the given read window size to the part boundaries.
/// The `trim_only` flags controls whether the range is only trimmed down to
/// part boundaries or is allowed to grow wider.
fn align(read_window_size: usize, part_size: usize, trim_only: bool) -> usize {
let part_alignment = part_size;
let remainder = read_window_size % part_alignment;
if trim_only || remainder == 0 {
// trim it to the previous part boundary
read_window_size - remainder
} else {
// extend it to the next part boundary
read_window_size + (part_alignment - remainder)
}
}
3 changes: 1 addition & 2 deletions mountpoint-s3/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ where
max_read_window_size: config.max_read_window_size,
read_window_size_multiplier: config.read_window_size_multiplier,
request_range: range.into(),
read_part_size: config.read_part_size,
};
let (backpressure_controller, backpressure_limiter) =
new_backpressure_controller(backpressure_config, mem_limiter.clone());
Expand Down Expand Up @@ -257,7 +256,7 @@ impl<E, Cache, Runtime> CachingPartComposer<E, Cache, Runtime>
where
E: std::error::Error + Send + Sync,
Cache: DataCache + Send + Sync + 'static,
Runtime: Spawn
Runtime: Spawn,
{
async fn try_compose_parts(&mut self, request_stream: impl Stream<Item = RequestReaderOutput<E>>) {
if let Err(e) = self.compose_parts(request_stream).await {
Expand Down
8 changes: 7 additions & 1 deletion mountpoint-s3/src/prefetch/part_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ where
max_read_window_size: config.max_read_window_size,
read_window_size_multiplier: config.read_window_size_multiplier,
request_range: range.into(),
read_part_size: config.read_part_size,
};
let (backpressure_controller, mut backpressure_limiter) =
new_backpressure_controller(backpressure_config, mem_limiter.clone());
Expand Down Expand Up @@ -386,6 +385,13 @@ fn read_from_request<'a, Client: ObjectClient + 'a>(
if next_offset == request_range.end {
break;
}

// The CRT could return data more than what we have requested in the read window
// which means unaccounted memory, so we want to record them here.
let excess_bytes = next_offset.saturating_sub(backpressure_limiter.read_window_end_offset());
if excess_bytes > 0 {
metrics::histogram!("s3.client.read_window_excess_bytes").record(excess_bytes as f64);
}
// Blocks if read window increment if it's not enough to read the next offset
if let Some(next_read_window_offset) = backpressure_limiter.wait_for_read_window_increment(next_offset).await? {
let diff = next_read_window_offset.saturating_sub(request.as_ref().read_window_end_offset()) as usize;
Expand Down

0 comments on commit 42108c1

Please sign in to comment.