diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index 3de6f3be2..dbb0f23fb 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -412,6 +412,8 @@ impl S3CrtClientInner { let start_time = Instant::now(); let first_body_part = Arc::new(AtomicBool::new(true)); let first_body_part_clone = Arc::clone(&first_body_part); + let total_bytes = Arc::new(AtomicU64::new(0)); + let total_bytes_clone = Arc::clone(&total_bytes); options .on_telemetry(move |metrics| { @@ -456,6 +458,7 @@ impl S3CrtClientInner { let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown"); metrics::histogram!("s3.meta_requests.first_byte_latency_us", latency, "op" => op); } + total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst); trace!(start = range_start, length = data.len(), "body part received"); @@ -474,6 +477,12 @@ impl S3CrtClientInner { let latency = duration.as_micros() as f64; metrics::histogram!("s3.meta_requests.first_byte_latency_us", latency, "op" => op); } + let total_bytes = total_bytes_clone.load(Ordering::SeqCst); + // We only log throughput of object data. PUT needs to be measured in its stream + // implementation rather than these callbacks, so we can only do GET here. + if op == "get_object" { + emit_throughput_metric(total_bytes, duration, op); + } let log_level = status_code_to_log_level(request_result.response_status); @@ -891,6 +900,22 @@ fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option16MiB" + }; + metrics::histogram!("s3.meta_requests.throughput_mibs", throughput_mbps, "op" => op, "size" => bucket); +} + #[async_trait] impl ObjectClient for S3CrtClient { type GetObjectResult = S3GetObjectRequest; diff --git a/mountpoint-s3-client/src/s3_crt_client/put_object.rs b/mountpoint-s3-client/src/s3_crt_client/put_object.rs index 31ac5e6b3..13af3981d 100644 --- a/mountpoint-s3-client/src/s3_crt_client/put_object.rs +++ b/mountpoint-s3-client/src/s3_crt_client/put_object.rs @@ -1,6 +1,8 @@ use std::sync::{Arc, Mutex}; +use std::time::Instant; use crate::object_client::{ObjectClientResult, PutObjectError, PutObjectParams}; +use crate::s3_crt_client::emit_throughput_metric; use crate::{PutObjectRequest, PutObjectResult, S3CrtClient, S3RequestError}; use async_trait::async_trait; use mountpoint_s3_crt::http::request_response::Header; @@ -55,6 +57,8 @@ impl S3CrtClient { body, writer, review_callback, + start_time: Instant::now(), + total_bytes: 0, }) } } @@ -97,6 +101,8 @@ pub struct S3PutObjectRequest { body: S3HttpRequest, PutObjectError>, writer: AsyncStreamWriter, review_callback: ReviewCallbackBox, + start_time: Instant, + total_bytes: u64, } #[async_trait] @@ -104,6 +110,7 @@ impl PutObjectRequest for S3PutObjectRequest { type ClientError = S3RequestError; async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> { + self.total_bytes += slice.len() as u64; self.writer .write(slice) .await @@ -127,6 +134,11 @@ impl PutObjectRequest for S3PutObjectRequest { self.body }; - body.await.map(|_| PutObjectResult {}) + let result = body.await; + + let elapsed = self.start_time.elapsed(); + emit_throughput_metric(self.total_bytes, elapsed, "put_object"); + + result.map(|_| PutObjectResult {}) } } diff --git a/mountpoint-s3/CHANGELOG.md b/mountpoint-s3/CHANGELOG.md index 5db7d9882..61d895029 100644 --- a/mountpoint-s3/CHANGELOG.md +++ b/mountpoint-s3/CHANGELOG.md @@ -1,3 +1,7 @@ +## Unreleased changes + +* Added new metrics for object writes, IO sizes, file handles, and directory operations. The existing `fuse.bytes_read` metric has been renamed to `fuse.total_bytes` and is now keyed by operation (`read`/`write`). + ## v1.0.0 (August 8, 2023) ### Breaking changes diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 913b7d64a..a21ad8ebe 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -88,6 +88,7 @@ impl FileHandleType { } Ok(request) => FileHandleType::Write(UploadState::InProgress { request, handle }.into()), }; + metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "write"); Ok(handle) } @@ -106,6 +107,7 @@ impl FileHandleType { Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"), }, }; + metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "read"); Ok(handle) } } @@ -325,7 +327,8 @@ pub struct Opened { /// Reply to a `readdir` or `readdirplus` call pub trait DirectoryReplier { - /// Add a new dentry to the reply. Returns true if the buffer was full. + /// Add a new dentry to the reply. Returns true if the buffer was full and so the entry was not + /// added. fn add>( &mut self, ino: u64, @@ -815,16 +818,19 @@ where match file_handle.typ { FileHandleType::Write(request) => { - // Errors won't actually be seen by the user because `release` is async, - // but it's the right thing to do. - request + let result = request .into_inner() .complete_if_in_progress(&file_handle.full_key) - .await + .await; + metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "write"); + // Errors won't actually be seen by the user because `release` is async, + // but it's the right thing to do. + result } FileHandleType::Read { request: _, etag: _ } => { // TODO make sure we cancel the inflight PrefetchingGetRequest. is just dropping enough? file_handle.inode.finish_reading()?; + metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "read"); Ok(()) } } diff --git a/mountpoint-s3/src/fuse.rs b/mountpoint-s3/src/fuse.rs index 39e10a91e..545bff467 100644 --- a/mountpoint-s3/src/fuse.rs +++ b/mountpoint-s3/src/fuse.rs @@ -148,7 +148,8 @@ where ); // return value of read is proof a reply was sent - metrics::counter!("fuse.bytes_read", bytes_sent as u64); + metrics::counter!("fuse.total_bytes", bytes_sent as u64, "type" => "read"); + metrics::histogram!("fuse.io_size", bytes_sent as f64, "type" => "read"); } #[instrument(level="warn", skip_all, fields(req=_req.unique(), ino=parent))] @@ -163,6 +164,7 @@ where fn readdir(&self, _req: &Request<'_>, parent: InodeNo, fh: u64, offset: i64, mut reply: fuser::ReplyDirectory) { struct ReplyDirectory<'a> { inner: &'a mut fuser::ReplyDirectory, + count: &'a mut usize, } impl<'a> DirectoryReplier for ReplyDirectory<'a> { @@ -175,14 +177,25 @@ where _generation: u64, _ttl: Duration, ) -> bool { - self.inner.add(ino, offset, attr.kind, name) + let result = self.inner.add(ino, offset, attr.kind, name); + if !result { + *self.count += 1; + } + result } } - let replier = ReplyDirectory { inner: &mut reply }; + let mut count = 0; + let replier = ReplyDirectory { + inner: &mut reply, + count: &mut count, + }; match block_on(self.fs.readdir(parent, fh, offset, replier).in_current_span()) { - Ok(_) => reply.ok(), + Ok(_) => { + reply.ok(); + metrics::counter!("fuse.readdir.entries", count as u64); + } Err(e) => fuse_error!("readdir", reply, e), } } @@ -198,6 +211,7 @@ where ) { struct ReplyDirectoryPlus<'a> { inner: &'a mut fuser::ReplyDirectoryPlus, + count: &'a mut usize, } impl<'a> DirectoryReplier for ReplyDirectoryPlus<'a> { @@ -210,14 +224,25 @@ where generation: u64, ttl: Duration, ) -> bool { - self.inner.add(ino, offset, name, &ttl, &attr, generation) + let result = self.inner.add(ino, offset, name, &ttl, &attr, generation); + if !result { + *self.count += 1; + } + result } } - let replier = ReplyDirectoryPlus { inner: &mut reply }; + let mut count = 0; + let replier = ReplyDirectoryPlus { + inner: &mut reply, + count: &mut count, + }; match block_on(self.fs.readdirplus(parent, fh, offset, replier).in_current_span()) { - Ok(_) => reply.ok(), + Ok(_) => { + reply.ok(); + metrics::counter!("fuse.readdirplus.entries", count as u64); + } Err(e) => fuse_error!("readdirplus", reply, e), } } @@ -304,7 +329,11 @@ where .write(ino, fh, offset, data, write_flags, flags, lock_owner) .in_current_span(), ) { - Ok(bytes_written) => reply.written(bytes_written), + Ok(bytes_written) => { + reply.written(bytes_written); + metrics::counter!("fuse.total_bytes", bytes_written as u64, "type" => "write"); + metrics::histogram!("fuse.io_size", bytes_written as f64, "type" => "write"); + } Err(e) => fuse_error!("write", reply, e), } }