Skip to content

Commit

Permalink
ByteRange can do last n bytes now (#285)
Browse files Browse the repository at this point in the history
This provides an approach to deal with #277
  • Loading branch information
paraseba authored Oct 16, 2024
1 parent 215a9ac commit 1db4b1d
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 140 deletions.
73 changes: 29 additions & 44 deletions icechunk/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
fmt::{Debug, Display},
hash::Hash,
marker::PhantomData,
ops::Bound,
ops::Range,
};

use bytes::Bytes;
Expand Down Expand Up @@ -138,73 +138,58 @@ pub type ChunkOffset = u64;
pub type ChunkLength = u64;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ByteRange(pub Bound<ChunkOffset>, pub Bound<ChunkOffset>);
pub enum ByteRange {
/// The fixed length range represented by the given `Range`
Bounded(Range<ChunkOffset>),
/// All bytes from the given offset (included) to the end of the object
From(ChunkOffset),
/// Last n bytes in the object
Last(ChunkLength),
}

impl From<Range<ChunkOffset>> for ByteRange {
fn from(value: Range<ChunkOffset>) -> Self {
ByteRange::Bounded(value)
}
}

impl ByteRange {
pub fn from_offset(offset: ChunkOffset) -> Self {
Self(Bound::Included(offset), Bound::Unbounded)
Self::From(offset)
}

pub fn from_offset_with_length(offset: ChunkOffset, length: ChunkOffset) -> Self {
Self(Bound::Included(offset), Bound::Excluded(offset + length))
Self::Bounded(offset..offset + length)
}

pub fn to_offset(offset: ChunkOffset) -> Self {
Self(Bound::Unbounded, Bound::Excluded(offset))
Self::Bounded(0..offset)
}

pub fn bounded(start: ChunkOffset, end: ChunkOffset) -> Self {
Self(Bound::Included(start), Bound::Excluded(end))
}

pub fn length(&self) -> Option<u64> {
match (self.0, self.1) {
(_, Bound::Unbounded) => None,
(Bound::Unbounded, Bound::Excluded(end)) => Some(end),
(Bound::Unbounded, Bound::Included(end)) => Some(end + 1),
(Bound::Included(start), Bound::Excluded(end)) => Some(end - start),
(Bound::Excluded(start), Bound::Included(end)) => Some(end - start),
(Bound::Included(start), Bound::Included(end)) => Some(end - start + 1),
(Bound::Excluded(start), Bound::Excluded(end)) => Some(end - start - 1),
}
(start..end).into()
}

pub const ALL: Self = Self(Bound::Unbounded, Bound::Unbounded);
pub const ALL: Self = Self::From(0);

pub fn slice(&self, bytes: Bytes) -> Bytes {
match (self.0, self.1) {
(Bound::Included(start), Bound::Excluded(end)) => {
bytes.slice(start as usize..end as usize)
}
(Bound::Included(start), Bound::Unbounded) => bytes.slice(start as usize..),
(Bound::Unbounded, Bound::Excluded(end)) => bytes.slice(..end as usize),
(Bound::Excluded(start), Bound::Excluded(end)) => {
bytes.slice(start as usize + 1..end as usize)
}
(Bound::Excluded(start), Bound::Unbounded) => {
bytes.slice(start as usize + 1..)
match self {
ByteRange::Bounded(range) => {
bytes.slice(range.start as usize..range.end as usize)
}
(Bound::Unbounded, Bound::Included(end)) => bytes.slice(..=end as usize),
(Bound::Included(start), Bound::Included(end)) => {
bytes.slice(start as usize..=end as usize)
}
(Bound::Excluded(start), Bound::Included(end)) => {
bytes.slice(start as usize + 1..=end as usize)
}
(Bound::Unbounded, Bound::Unbounded) => bytes,
ByteRange::From(from) => bytes.slice(*from as usize..),
ByteRange::Last(n) => bytes.slice(bytes.len() - *n as usize..bytes.len()),
}
}
}

impl From<(Option<ChunkOffset>, Option<ChunkOffset>)> for ByteRange {
fn from((start, end): (Option<ChunkOffset>, Option<ChunkOffset>)) -> Self {
match (start, end) {
(Some(start), Some(end)) => {
Self(Bound::Included(start), Bound::Excluded(end))
}
(Some(start), None) => Self(Bound::Included(start), Bound::Unbounded),
(None, Some(end)) => Self(Bound::Unbounded, Bound::Excluded(end)),
(None, None) => Self(Bound::Unbounded, Bound::Unbounded),
(Some(start), Some(end)) => Self::Bounded(start..end),
(Some(start), None) => Self::From(start),
(None, Some(end)) => Self::Bounded(0..end),
(None, None) => Self::ALL,
}
}
}
Expand Down
33 changes: 7 additions & 26 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,21 @@ use object_store::{
PutPayload,
};
use std::{
fs::create_dir_all, future::ready, ops::Bound, path::Path as StdPath, sync::Arc,
fs::create_dir_all, future::ready, ops::Range, path::Path as StdPath, sync::Arc,
};

use super::{Storage, StorageError, StorageResult};

// Get Range is object_store specific, keep it with this module
impl From<&ByteRange> for Option<GetRange> {
fn from(value: &ByteRange) -> Self {
match (value.0, value.1) {
(Bound::Included(start), Bound::Excluded(end)) => {
Some(GetRange::Bounded(start as usize..end as usize))
match value {
ByteRange::Bounded(Range { start, end }) => {
Some(GetRange::Bounded(*start as usize..*end as usize))
}
(Bound::Included(start), Bound::Unbounded) => {
Some(GetRange::Offset(start as usize))
}
(Bound::Included(start), Bound::Included(end)) => {
Some(GetRange::Bounded(start as usize..end as usize + 1))
}
(Bound::Excluded(start), Bound::Excluded(end)) => {
Some(GetRange::Bounded(start as usize + 1..end as usize))
}
(Bound::Excluded(start), Bound::Unbounded) => {
Some(GetRange::Offset(start as usize + 1))
}
(Bound::Excluded(start), Bound::Included(end)) => {
Some(GetRange::Bounded(start as usize + 1..end as usize + 1))
}
(Bound::Unbounded, Bound::Excluded(end)) => {
Some(GetRange::Bounded(0..end as usize))
}
(Bound::Unbounded, Bound::Included(end)) => {
Some(GetRange::Bounded(0..end as usize + 1))
}
(Bound::Unbounded, Bound::Unbounded) => None,
ByteRange::From(start) if *start == 0u64 => None,
ByteRange::From(start) => Some(GetRange::Offset(*start as usize)),
ByteRange::Last(n) => Some(GetRange::Suffix(*n as usize)),
}
}
}
Expand Down
29 changes: 5 additions & 24 deletions icechunk/src/storage/s3.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{ops::Bound, path::PathBuf, sync::Arc};
use std::{ops::Range, path::PathBuf, sync::Arc};

use async_stream::try_stream;
use async_trait::async_trait;
Expand Down Expand Up @@ -207,31 +207,12 @@ impl S3Storage {

pub fn range_to_header(range: &ByteRange) -> Option<String> {
match range {
ByteRange(Bound::Unbounded, Bound::Unbounded) => None,
ByteRange(Bound::Included(start), Bound::Excluded(end)) => {
ByteRange::Bounded(Range { start, end }) => {
Some(format!("bytes={}-{}", start, end - 1))
}
ByteRange(Bound::Included(start), Bound::Unbounded) => {
Some(format!("bytes={}-", start))
}
ByteRange(Bound::Included(start), Bound::Included(end)) => {
Some(format!("bytes={}-{}", start, end))
}
ByteRange(Bound::Excluded(start), Bound::Excluded(end)) => {
Some(format!("bytes={}-{}", start + 1, end - 1))
}
ByteRange(Bound::Excluded(start), Bound::Unbounded) => {
Some(format!("bytes={}-", start + 1))
}
ByteRange(Bound::Excluded(start), Bound::Included(end)) => {
Some(format!("bytes={}-{}", start + 1, end))
}
ByteRange(Bound::Unbounded, Bound::Excluded(end)) => {
Some(format!("bytes=0-{}", end - 1))
}
ByteRange(Bound::Unbounded, Bound::Included(end)) => {
Some(format!("bytes=0-{}", end))
}
ByteRange::From(offset) if *offset == 0 => None,
ByteRange::From(offset) => Some(format!("bytes={}-", offset)),
ByteRange::Last(n) => Some(format!("bytes={}-", n)),
}
}

Expand Down
74 changes: 28 additions & 46 deletions icechunk/src/storage/virtual_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use bytes::Bytes;
use object_store::local::LocalFileSystem;
use object_store::{path::Path as ObjectPath, GetOptions, GetRange, ObjectStore};
use serde::{Deserialize, Serialize};
use std::cmp::{max, min};
use std::cmp::min;
use std::fmt::Debug;
use std::ops::Bound;
use tokio::sync::OnceCell;
use url::{self, Url};

Expand Down Expand Up @@ -115,25 +114,23 @@ pub fn construct_valid_byte_range(
) -> ByteRange {
// TODO: error for offset<0
// TODO: error if request.start > offset + length
// FIXME: we allow creating a ByteRange(start, end) where end < start
let new_offset = match request.0 {
Bound::Unbounded => chunk_offset,
Bound::Included(start) => max(start, 0) + chunk_offset,
Bound::Excluded(start) => max(start, 0) + chunk_offset + 1,
};
request.length().map_or(
ByteRange(
Bound::Included(new_offset),
Bound::Excluded(chunk_offset + chunk_length),
),
|reqlen| {
ByteRange(
Bound::Included(new_offset),
// no request can go past offset + length, so clamp it
Bound::Excluded(min(new_offset + reqlen, chunk_offset + chunk_length)),
)
},
)
match request {
ByteRange::Bounded(std::ops::Range { start: req_start, end: req_end }) => {
let new_start =
min(chunk_offset + req_start, chunk_offset + chunk_length - 1);
let new_end = min(chunk_offset + req_end, chunk_offset + chunk_length);
ByteRange::Bounded(new_start..new_end)
}
ByteRange::From(n) => {
let new_start = min(chunk_offset + n, chunk_offset + chunk_length - 1);
ByteRange::Bounded(new_start..chunk_offset + chunk_length)
}
ByteRange::Last(n) => {
let new_end = chunk_offset + chunk_length;
let new_start = new_end - n;
ByteRange::Bounded(new_start..new_end)
}
}
}

impl private::Sealed for ObjectStoreVirtualChunkResolver {}
Expand Down Expand Up @@ -196,43 +193,28 @@ mod tests {
// output.length() == requested.length()
// output.0 >= chunk_ref.offset
prop_assert_eq!(
construct_valid_byte_range(
&ByteRange(Bound::Included(0), Bound::Excluded(length)),
offset,
length,
),
ByteRange(Bound::Included(offset), Bound::Excluded(max_end))
construct_valid_byte_range(&ByteRange::Bounded(0..length), offset, length,),
ByteRange::Bounded(offset..max_end)
);
prop_assert_eq!(
construct_valid_byte_range(
&ByteRange(Bound::Unbounded, Bound::Excluded(length)),
&ByteRange::Bounded(request_offset..max_end),
offset,
length
),
ByteRange(Bound::Included(offset), Bound::Excluded(max_end))
ByteRange::Bounded(request_offset + offset..max_end)
);
prop_assert_eq!(
construct_valid_byte_range(
&ByteRange(Bound::Included(request_offset), Bound::Excluded(max_end)),
offset,
length
),
ByteRange(Bound::Included(request_offset + offset), Bound::Excluded(max_end))
construct_valid_byte_range(&ByteRange::ALL, offset, length),
ByteRange::Bounded(offset..offset + length)
);
prop_assert_eq!(
construct_valid_byte_range(&ByteRange::ALL, offset, length),
ByteRange(Bound::Included(offset), Bound::Excluded(max_end))
construct_valid_byte_range(&ByteRange::From(request_offset), offset, length),
ByteRange::Bounded(offset + request_offset..offset + length)
);
prop_assert_eq!(
construct_valid_byte_range(
&ByteRange(Bound::Excluded(request_offset), Bound::Unbounded),
offset,
length
),
ByteRange(
Bound::Included(offset + request_offset + 1),
Bound::Excluded(max_end)
)
construct_valid_byte_range(&ByteRange::Last(request_offset), offset, length),
ByteRange::Bounded(offset + length - request_offset..offset + length)
);
}
}

0 comments on commit 1db4b1d

Please sign in to comment.