Skip to content

Commit

Permalink
Add new metrics for IO, handles, throughput (#461)
Browse files Browse the repository at this point in the history
This change adds a bunch of new metrics for investigating performance.
It lets us track per-IO read/write size, number of open read/write
handles, directory listing throughput, and meta request throughput for
uploads and downloads.

Signed-off-by: James Bornholt <bornholt@amazon.com>
  • Loading branch information
jamesbornholt authored Aug 18, 2023
1 parent 578f47f commit dd61aeb
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 14 deletions.
25 changes: 25 additions & 0 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@ impl S3CrtClientInner {
let start_time = Instant::now();
let first_body_part = Arc::new(AtomicBool::new(true));
let first_body_part_clone = Arc::clone(&first_body_part);
let total_bytes = Arc::new(AtomicU64::new(0));
let total_bytes_clone = Arc::clone(&total_bytes);

options
.on_telemetry(move |metrics| {
Expand Down Expand Up @@ -456,6 +458,7 @@ impl S3CrtClientInner {
let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
metrics::histogram!("s3.meta_requests.first_byte_latency_us", latency, "op" => op);
}
total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);

trace!(start = range_start, length = data.len(), "body part received");

Expand All @@ -474,6 +477,12 @@ impl S3CrtClientInner {
let latency = duration.as_micros() as f64;
metrics::histogram!("s3.meta_requests.first_byte_latency_us", latency, "op" => op);
}
let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
// We only log throughput of object data. PUT needs to be measured in its stream
// implementation rather than these callbacks, so we can only do GET here.
if op == "get_object" {
emit_throughput_metric(total_bytes, duration, op);
}

let log_level = status_code_to_log_level(request_result.response_status);

Expand Down Expand Up @@ -891,6 +900,22 @@ fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3Reque
}
}

/// Record a throughput metric for GET/PUT. We can't inline this into S3CrtClient callbacks because
/// PUT bytes don't transit those callbacks.
fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
let throughput_mbps = bytes as f64 / 1024.0 / 1024.0 / duration.as_secs_f64();
// Semi-arbitrary choices here to avoid averaging out large and small requests
const MEGABYTE: u64 = 1024 * 1024;
let bucket = if bytes < MEGABYTE {
"<1MiB"
} else if bytes <= 16 * MEGABYTE {
"1-16MiB"
} else {
">16MiB"
};
metrics::histogram!("s3.meta_requests.throughput_mibs", throughput_mbps, "op" => op, "size" => bucket);
}

#[async_trait]
impl ObjectClient for S3CrtClient {
type GetObjectResult = S3GetObjectRequest;
Expand Down
14 changes: 13 additions & 1 deletion mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::{Arc, Mutex};
use std::time::Instant;

use crate::object_client::{ObjectClientResult, PutObjectError, PutObjectParams};
use crate::s3_crt_client::emit_throughput_metric;
use crate::{PutObjectRequest, PutObjectResult, S3CrtClient, S3RequestError};
use async_trait::async_trait;
use mountpoint_s3_crt::http::request_response::Header;
Expand Down Expand Up @@ -55,6 +57,8 @@ impl S3CrtClient {
body,
writer,
review_callback,
start_time: Instant::now(),
total_bytes: 0,
})
}
}
Expand Down Expand Up @@ -97,13 +101,16 @@ pub struct S3PutObjectRequest {
body: S3HttpRequest<Vec<u8>, PutObjectError>,
writer: AsyncStreamWriter,
review_callback: ReviewCallbackBox,
start_time: Instant,
total_bytes: u64,
}

#[async_trait]
impl PutObjectRequest for S3PutObjectRequest {
type ClientError = S3RequestError;

async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
self.total_bytes += slice.len() as u64;
self.writer
.write(slice)
.await
Expand All @@ -127,6 +134,11 @@ impl PutObjectRequest for S3PutObjectRequest {
self.body
};

body.await.map(|_| PutObjectResult {})
let result = body.await;

let elapsed = self.start_time.elapsed();
emit_throughput_metric(self.total_bytes, elapsed, "put_object");

result.map(|_| PutObjectResult {})
}
}
4 changes: 4 additions & 0 deletions mountpoint-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Unreleased changes

* Added new metrics for object writes, IO sizes, file handles, and directory operations. The existing `fuse.bytes_read` metric has been renamed to `fuse.total_bytes` and is now keyed by operation (`read`/`write`).

## v1.0.0 (August 8, 2023)

### Breaking changes
Expand Down
16 changes: 11 additions & 5 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
}
Ok(request) => FileHandleType::Write(UploadState::InProgress { request, handle }.into()),
};
metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "write");
Ok(handle)
}

Expand All @@ -106,6 +107,7 @@ impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"),
},
};
metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "read");
Ok(handle)
}
}
Expand Down Expand Up @@ -325,7 +327,8 @@ pub struct Opened {

/// Reply to a `readdir` or `readdirplus` call
pub trait DirectoryReplier {
/// Add a new dentry to the reply. Returns true if the buffer was full.
/// Add a new dentry to the reply. Returns true if the buffer was full and so the entry was not
/// added.
fn add<T: AsRef<OsStr>>(
&mut self,
ino: u64,
Expand Down Expand Up @@ -815,16 +818,19 @@ where

match file_handle.typ {
FileHandleType::Write(request) => {
// Errors won't actually be seen by the user because `release` is async,
// but it's the right thing to do.
request
let result = request
.into_inner()
.complete_if_in_progress(&file_handle.full_key)
.await
.await;
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "write");
// Errors won't actually be seen by the user because `release` is async,
// but it's the right thing to do.
result
}
FileHandleType::Read { request: _, etag: _ } => {
// TODO make sure we cancel the inflight PrefetchingGetRequest. is just dropping enough?
file_handle.inode.finish_reading()?;
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "read");
Ok(())
}
}
Expand Down
45 changes: 37 additions & 8 deletions mountpoint-s3/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ where
);
// return value of read is proof a reply was sent

metrics::counter!("fuse.bytes_read", bytes_sent as u64);
metrics::counter!("fuse.total_bytes", bytes_sent as u64, "type" => "read");
metrics::histogram!("fuse.io_size", bytes_sent as f64, "type" => "read");
}

#[instrument(level="warn", skip_all, fields(req=_req.unique(), ino=parent))]
Expand All @@ -163,6 +164,7 @@ where
fn readdir(&self, _req: &Request<'_>, parent: InodeNo, fh: u64, offset: i64, mut reply: fuser::ReplyDirectory) {
struct ReplyDirectory<'a> {
inner: &'a mut fuser::ReplyDirectory,
count: &'a mut usize,
}

impl<'a> DirectoryReplier for ReplyDirectory<'a> {
Expand All @@ -175,14 +177,25 @@ where
_generation: u64,
_ttl: Duration,
) -> bool {
self.inner.add(ino, offset, attr.kind, name)
let result = self.inner.add(ino, offset, attr.kind, name);
if !result {
*self.count += 1;
}
result
}
}

let replier = ReplyDirectory { inner: &mut reply };
let mut count = 0;
let replier = ReplyDirectory {
inner: &mut reply,
count: &mut count,
};

match block_on(self.fs.readdir(parent, fh, offset, replier).in_current_span()) {
Ok(_) => reply.ok(),
Ok(_) => {
reply.ok();
metrics::counter!("fuse.readdir.entries", count as u64);
}
Err(e) => fuse_error!("readdir", reply, e),
}
}
Expand All @@ -198,6 +211,7 @@ where
) {
struct ReplyDirectoryPlus<'a> {
inner: &'a mut fuser::ReplyDirectoryPlus,
count: &'a mut usize,
}

impl<'a> DirectoryReplier for ReplyDirectoryPlus<'a> {
Expand All @@ -210,14 +224,25 @@ where
generation: u64,
ttl: Duration,
) -> bool {
self.inner.add(ino, offset, name, &ttl, &attr, generation)
let result = self.inner.add(ino, offset, name, &ttl, &attr, generation);
if !result {
*self.count += 1;
}
result
}
}

let replier = ReplyDirectoryPlus { inner: &mut reply };
let mut count = 0;
let replier = ReplyDirectoryPlus {
inner: &mut reply,
count: &mut count,
};

match block_on(self.fs.readdirplus(parent, fh, offset, replier).in_current_span()) {
Ok(_) => reply.ok(),
Ok(_) => {
reply.ok();
metrics::counter!("fuse.readdirplus.entries", count as u64);
}
Err(e) => fuse_error!("readdirplus", reply, e),
}
}
Expand Down Expand Up @@ -304,7 +329,11 @@ where
.write(ino, fh, offset, data, write_flags, flags, lock_owner)
.in_current_span(),
) {
Ok(bytes_written) => reply.written(bytes_written),
Ok(bytes_written) => {
reply.written(bytes_written);
metrics::counter!("fuse.total_bytes", bytes_written as u64, "type" => "write");
metrics::histogram!("fuse.io_size", bytes_written as f64, "type" => "write");
}
Err(e) => fuse_error!("write", reply, e),
}
}
Expand Down

0 comments on commit dd61aeb

Please sign in to comment.