Skip to content

Commit

Permalink
Reset prefetcher on any error (#933)
Browse files Browse the repository at this point in the history
* Add test of PartQueue invariant failure

Signed-off-by: Alessandro Passaro <alexpax@amazon.com>

* Reset prefetcher on any error

The prefetcher did not reset its internal state (and cancel current tasks) when encountering an error on a forward seek.
As a result, successive reads could try and read from a part queue in an invalid state. This change ensures that the prefetcher
is always reset when encountering an error, whether while reading or seeking.

Signed-off-by: Alessandro Passaro <alexpax@amazon.com>

* Fix prefetcher reset on integrity error

Signed-off-by: Alessandro Passaro <alexpax@amazon.com>

---------

Signed-off-by: Alessandro Passaro <alexpax@amazon.com>
Co-authored-by: Alessandro Passaro <alexpax@amazon.com>
  • Loading branch information
passaro and Alessandro Passaro authored Jul 5, 2024
1 parent 936b805 commit 805c501
Showing 1 changed file with 118 additions and 51 deletions.
169 changes: 118 additions & 51 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,52 @@ where
next_seq_offset = self.next_sequential_read_offset,
"read"
);
let result = self.try_read(offset, length).await;
if result.is_err() {
self.reset_prefetch_to_offset(offset);
}
result
}
}

impl<Stream, Client> PrefetchGetObject<Stream, Client>
where
Stream: ObjectPartStream,
Client: ObjectClient + Send + Sync + 'static,
{
/// Create and spawn a new prefetching request for an object
fn new(
client: Arc<Client>,
part_stream: Arc<Stream>,
config: PrefetcherConfig,
bucket: &str,
key: &str,
size: u64,
etag: ETag,
) -> Self {
PrefetchGetObject {
client,
part_stream,
config,
current_task: None,
future_tasks: Default::default(),
backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize),
preferred_part_size: 128 * 1024,
sequential_read_start_offset: 0,
next_sequential_read_offset: 0,
next_request_size: config.first_request_size,
next_request_offset: 0,
bucket: bucket.to_owned(),
object_id: ObjectId::new(key.to_owned(), etag),
size,
}
}

async fn try_read(
&mut self,
offset: u64,
length: usize,
) -> Result<ChecksummedBytes, PrefetchReadError<Client::ClientError>> {
// Currently, we set preferred part size to the current read size.
// Our assumption is that the read size will be the same for most sequential
// read and it can be aligned to the size of prefetched chunks.
Expand Down Expand Up @@ -295,13 +340,7 @@ where
};
debug_assert!(current_task.remaining() > 0);

let part = match current_task.read(to_read as usize).await {
Err(e) => {
self.reset_prefetch_to_offset(offset);
return Err(e);
}
Ok(part) => part,
};
let part = current_task.read(to_read as usize).await?;
self.backward_seek_window.push(part.clone());
let part_bytes = part
.into_bytes(&self.object_id, self.next_sequential_read_offset)
Expand All @@ -318,55 +357,12 @@ where
}

let part_len = part_bytes.len() as u64;
let result = response.extend(part_bytes);
match result {
Ok(()) => {}
Err(e @ IntegrityError::ChecksumMismatch(_, _)) => {
// cancel inflight tasks
self.current_task = None;
self.future_tasks.drain(..);
return Err(e.into());
}
}
response.extend(part_bytes)?;
to_read -= part_len;
}

Ok(response)
}
}

impl<Stream, Client> PrefetchGetObject<Stream, Client>
where
Stream: ObjectPartStream,
Client: ObjectClient + Send + Sync + 'static,
{
/// Create and spawn a new prefetching request for an object
fn new(
client: Arc<Client>,
part_stream: Arc<Stream>,
config: PrefetcherConfig,
bucket: &str,
key: &str,
size: u64,
etag: ETag,
) -> Self {
PrefetchGetObject {
client,
part_stream,
config,
current_task: None,
future_tasks: Default::default(),
backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize),
preferred_part_size: 128 * 1024,
sequential_read_start_offset: 0,
next_sequential_read_offset: 0,
next_request_size: config.first_request_size,
next_request_offset: 0,
bucket: bucket.to_owned(),
object_id: ObjectId::new(key.to_owned(), etag),
size,
}
}

/// Runs on every read to prepare and spawn any requests our prefetching logic requires
fn prepare_requests(&mut self) {
Expand Down Expand Up @@ -967,6 +963,77 @@ mod tests {
run_random_read_test(default_stream(), object_size, reads, config);
}

#[test]
fn test_forward_seek_failure() {
const PART_SIZE: usize = 8192;
const OBJECT_SIZE: usize = 2 * PART_SIZE;

let config = MockClientConfig {
bucket: "test-bucket".to_string(),
part_size: PART_SIZE,
..Default::default()
};

let client = MockClient::new(config);
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();
client.add_object("hello", object);

let mut get_failures = HashMap::new();
get_failures.insert(
1,
Ok((
2,
MockClientError("error in the second chunk of the first request".into()),
)),
);
get_failures.insert(
2,
Err(ObjectClientError::ClientError(MockClientError(
"error in second request".into(),
))),
);

let client = Arc::new(countdown_failure_client(
client,
get_failures,
HashMap::new(),
HashMap::new(),
HashMap::new(),
));

// For simplicity, prefetch the whole object in one request.
let prefetcher_config = PrefetcherConfig {
first_request_size: OBJECT_SIZE,
..Default::default()
};
let prefetcher = Prefetcher::new(default_stream(), prefetcher_config);
block_on(async {
let mut request = prefetcher.prefetch(client, "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone());

// The first read should trigger the prefetcher to try and get the whole object (in 2 parts).
_ = request.read(0, 1).await.expect("first read should succeed");

// Seek to the second part (where we injected a failure).
let offset = PART_SIZE + 1;
_ = request.read(offset as u64, 1).await.expect_err("seek should fail");

// A retry should trigger a new request (also failing).
_ = request
.read(offset as u64, 1)
.await
.expect_err("first retry after failure should fail");

// New retry should succeed (no more failures injected).
let byte = request
.read(offset as u64, 1)
.await
.expect("second retry should succeed");
let expected = ramp_bytes(0xaa + offset, 1);
assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
});
}

#[test_case(0, 25; "no first read")]
#[test_case(60, 25; "read beyond first part")]
#[test_case(20, 25; "read in first part")]
Expand Down

0 comments on commit 805c501

Please sign in to comment.