Skip to content

Commit

Permalink
Cancel unused in-flight prefetch tasks (#505)
Browse files Browse the repository at this point in the history
Previously, mountpoint-s3 would not cancel prefetch tasks that it was going to ignore.
Instead, they would continue to be polled by the executor despite the results never being checked.
This change ensures that the task handles are dropped which cancels the task/future.

In the future, we may want to retain some of these tasks where the prefetcher may still be able to make use of them.

Signed-off-by: Daniel Carl Jones <djonesoa@amazon.com>
  • Loading branch information
dannycjones authored Sep 6, 2023
1 parent b632bbe commit 4db11ad
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fmt::Debug;
use std::time::Duration;

use bytes::Bytes;
use futures::future::RemoteHandle;
use futures::pin_mut;
use futures::stream::StreamExt;
use futures::task::{Spawn, SpawnExt};
Expand Down Expand Up @@ -182,13 +183,7 @@ where
"out-of-order read, resetting prefetch"
);
counter!("prefetch.out_of_order", 1);
// TODO cancel inflight requests
// TODO see if we can reuse any inflight requests rather than dropping them immediately
self.current_task = None;
self.future_tasks.write().unwrap().drain(..);
self.next_request_size = self.inner.config.first_request_size;
self.next_sequential_read_offset = offset;
self.next_request_offset = offset;
self.reset_prefetch_to_offset(offset);
}
debug_assert_eq!(self.next_sequential_read_offset, offset);

Expand All @@ -205,12 +200,7 @@ where

let part = match current_task.read(to_read as usize).await {
Err(e) => {
// cancel inflight tasks
self.current_task = None;
self.future_tasks.write().unwrap().drain(..);
self.next_request_size = self.inner.config.first_request_size;
self.next_sequential_read_offset = offset;
self.next_request_offset = offset;
self.reset_prefetch_to_offset(offset);
return Err(e);
}
Ok(part) => part,
Expand Down Expand Up @@ -330,21 +320,21 @@ where
None => break,
}
}
trace!("finished");
trace!("request finished");
}
}
}
.instrument(span)
};

// TODO hold onto this so we can cancel the task
self.inner.runtime.spawn(request_task).unwrap();

// [read] will reset these if the reader stops making sequential requests
self.next_request_offset += size;
self.next_request_size = self.get_next_request_size();

let task_handle = self.inner.runtime.spawn_with_handle(request_task).unwrap();

Some(RequestTask {
_task_handle: task_handle,
total_size: size as usize,
remaining: size as usize,
part_queue,
Expand Down Expand Up @@ -376,11 +366,23 @@ where
next_request_size - remainder
}
}

/// Reset this prefetch request to a new offset, clearing any existing tasks queued.
fn reset_prefetch_to_offset(&mut self, offset: u64) {
// TODO see if we can reuse any inflight requests rather than dropping them immediately
self.current_task = None;
self.future_tasks.write().unwrap().drain(..);
self.next_request_size = self.inner.config.first_request_size;
self.next_sequential_read_offset = offset;
self.next_request_offset = offset;
}
}

/// A single GetObject request submitted to the S3 client
#[derive(Debug)]
struct RequestTask<E> {
/// Handle on the task/future. Future is cancelled when handle is dropped.
_task_handle: RemoteHandle<()>,
remaining: usize,
total_size: usize,
part_queue: PartQueue<E>,
Expand Down

0 comments on commit 4db11ad

Please sign in to comment.