Skip to content

Commit

Permalink
Write cache block in background
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
  • Loading branch information
passaro authored and Alessandro Passaro committed Sep 20, 2024
1 parent cdc3a4f commit 4ff1f8a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 53 deletions.
4 changes: 2 additions & 2 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ pub fn main<ClientBuilder, Client, Runtime>(client_builder: ClientBuilder) -> an
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
let args = CliArgs::parse();
let successful_mount_msg = format!(
Expand Down Expand Up @@ -700,7 +700,7 @@ fn mount<ClientBuilder, Client, Runtime>(args: CliArgs, client_builder: ClientBu
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
tracing::info!("mount-s3 {}", build_info::FULL_VERSION);
tracing::debug!("{:?}", args);
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub fn caching_prefetch<Cache, Runtime>(
) -> CachingPrefetcher<Cache, Runtime>
where
Cache: DataCache + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
let part_stream = CachingPartStream::new(runtime, cache);
Prefetcher::new(part_stream, prefetcher_config)
Expand Down
97 changes: 47 additions & 50 deletions mountpoint-s3/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<Cache, Runtime> CachingPartStream<Cache, Runtime> {
impl<Cache, Runtime> ObjectPartStream for CachingPartStream<Cache, Runtime>
where
Cache: DataCache + Send + Sync + 'static,
Runtime: Spawn,
Runtime: Spawn + Clone + Send + Sync + 'static,
{
fn spawn_get_object_request<Client>(
&self,
Expand All @@ -62,7 +62,13 @@ where
trace!(?range, "spawning request");

let request_task = {
let request = CachingRequest::new(client.clone(), self.cache.clone(), backpressure_limiter, config);
let request = CachingRequest::new(
client.clone(),
self.cache.clone(),
self.runtime.clone(),
backpressure_limiter,
config,
);
let span = debug_span!("prefetch", ?range);
request.get_from_cache(range, part_queue_producer).instrument(span)
};
Expand All @@ -74,27 +80,31 @@ where
}

#[derive(Debug)]
struct CachingRequest<Client: ObjectClient, Cache> {
struct CachingRequest<Client: ObjectClient, Cache, Runtime: Spawn> {
client: Client,
cache: Arc<Cache>,
runtime: Runtime,
backpressure_limiter: BackpressureLimiter,
config: RequestTaskConfig,
}

impl<Client, Cache> CachingRequest<Client, Cache>
impl<Client, Cache, Runtime> CachingRequest<Client, Cache, Runtime>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
Cache: DataCache + Send + Sync,
Cache: DataCache + Send + Sync + 'static,
Runtime: Spawn + Clone,
{
fn new(
client: Client,
cache: Arc<Cache>,
runtime: Runtime,
backpressure_limiter: BackpressureLimiter,
config: RequestTaskConfig,
) -> Self {
Self {
client,
cache,
runtime,
backpressure_limiter,
config,
}
Expand Down Expand Up @@ -210,8 +220,8 @@ where
original_range: range,
block_index: block_range.start,
block_offset: block_range.start * block_size,
buffer: ChecksummedBytes::default(),
cache: self.cache.clone(),
runtime: self.runtime.clone(),
};
part_composer.try_compose_parts(request_stream).await;
}
Expand All @@ -228,20 +238,21 @@ where
}
}

struct CachingPartComposer<E: std::error::Error, Cache> {
struct CachingPartComposer<E: std::error::Error, Cache, Runtime: Spawn> {
part_queue_producer: PartQueueProducer<E>,
cache_key: ObjectId,
original_range: RequestRange,
block_index: u64,
block_offset: u64,
buffer: ChecksummedBytes,
cache: Arc<Cache>,
runtime: Runtime,
}

impl<E, Cache> CachingPartComposer<E, Cache>
impl<E, Cache, Runtime> CachingPartComposer<E, Cache, Runtime>
where
E: std::error::Error + Send + Sync,
Cache: DataCache + Send + Sync,
Cache: DataCache + Send + Sync + 'static,
Runtime: Spawn,
{
async fn try_compose_parts(&mut self, request_stream: impl Stream<Item = RequestReaderOutput<E>>) {
if let Err(e) = self.compose_parts(request_stream).await {
Expand All @@ -257,15 +268,16 @@ where
) -> Result<(), PrefetchReadError<E>> {
let key = self.cache_key.key();
let block_size = self.cache.block_size();
let mut buffer = ChecksummedBytes::default();

pin_mut!(request_stream);
while let Some(next) = request_stream.next().await {
assert!(
self.buffer.len() < block_size as usize,
buffer.len() < block_size as usize,
"buffer should be flushed when we get a full block"
);
let (offset, body) = next?;
let expected_offset = self.block_offset + self.buffer.len() as u64;
let expected_offset = self.block_offset + buffer.len() as u64;
if offset != expected_offset {
warn!(key, offset, expected_offset, "wrong offset for GetObject body part");
return Err(PrefetchReadError::GetRequestReturnedWrongOffset {
Expand All @@ -278,7 +290,7 @@ where
let mut body: Bytes = body.into();
let mut offset = offset;
while !body.is_empty() {
let remaining = (block_size as usize).saturating_sub(self.buffer.len()).min(body.len());
let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len());
let chunk: ChecksummedBytes = body.split_to(remaining).into();

// We need to return some bytes to the part queue even before we can fill an entire caching block because
Expand All @@ -297,66 +309,51 @@ where
self.part_queue_producer.push(Ok(part));
}
offset += chunk.len() as u64;
self.buffer
buffer
.extend(chunk)
.inspect_err(|e| warn!(key, error=?e, "integrity check for body part failed"))?;
if self.buffer.len() < block_size as usize {
if buffer.len() < block_size as usize {
break;
}

// We have a full block: write it to the cache, send it to the queue, and flush the buffer.
update_cache(
self.cache.as_ref(),
&self.buffer,
self.block_index,
self.block_offset,
&self.cache_key,
)
.await;
self.update_cache(buffer, self.block_index, self.block_offset, &self.cache_key);
self.block_index += 1;
self.block_offset += block_size;
self.buffer = ChecksummedBytes::default();
buffer = ChecksummedBytes::default();
}
}

if !self.buffer.is_empty() {
if !buffer.is_empty() {
// If we still have data in the buffer, this must be the last block for this object,
// which can be smaller than block_size (and ends at the end of the object).
assert_eq!(
self.block_offset as usize + self.buffer.len(),
self.block_offset as usize + buffer.len(),
self.original_range.object_size(),
"a partial block is only allowed at the end of the object"
);
// Write the last block to the cache.
update_cache(
self.cache.as_ref(),
&self.buffer,
self.block_index,
self.block_offset,
&self.cache_key,
)
.await;
self.update_cache(buffer, self.block_index, self.block_offset, &self.cache_key);
}
Ok(())
}
}

async fn update_cache<Cache: DataCache + Send + Sync>(
cache: &Cache,
block: &ChecksummedBytes,
block_index: u64,
block_offset: u64,
object_id: &ObjectId,
) {
// TODO: consider updating the cache asynchronously
let start = Instant::now();
if let Err(error) = cache
.put_block(object_id.clone(), block_index, block_offset, block.clone())
.await
{
warn!(key=?object_id, block_index, ?error, "failed to update cache");
fn update_cache(&self, block: ChecksummedBytes, block_index: u64, block_offset: u64, object_id: &ObjectId) {
let object_id = object_id.clone();
let cache = self.cache.clone();
self.runtime
.spawn(async move {
let start = Instant::now();
if let Err(error) = cache
.put_block(object_id.clone(), block_index, block_offset, block)
.await
{
warn!(key=?object_id, block_index, ?error, "failed to update cache");
}
metrics::histogram!("prefetch.cache_update_duration_us").record(start.elapsed().as_micros() as f64);
})
.unwrap();
}
metrics::histogram!("prefetch.cache_update_duration_us").record(start.elapsed().as_micros() as f64);
}

/// Creates a Part that can be streamed to the prefetcher if the given bytes
Expand Down

0 comments on commit 4ff1f8a

Please sign in to comment.