Skip to content

Commit

Permalink
Merge pull request #15 from fjall-rs/byteview
Browse files Browse the repository at this point in the history
Byteview
  • Loading branch information
marvin-j97 authored Feb 4, 2025
2 parents bfcdfc9 + 02abbcf commit 348a7ba
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 101 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "value-log"
description = "Value log implementation for key-value separated LSM storage"
license = "MIT OR Apache-2.0"
version = "1.4.1"
version = "1.5.0"
edition = "2021"
rust-version = "1.74.0"
readme = "README.md"
Expand All @@ -22,10 +22,11 @@ serde = ["dep:serde"]
bytes = ["dep:bytes"]

[dependencies]
bytes = { version = "1", optional = true }
byteorder = "1.5.0"
bytes = { version = "1.8.0", optional = true }
byteview = "0.4.0"
interval-heap = "0.0.5"
log = "0.4.22"
min-max-heap = "1.3.0"
path-absolutize = "3.1.1"
quick_cache = { version = "0.6.5", default-features = false }
rustc-hash = "2.0.0"
Expand Down
31 changes: 15 additions & 16 deletions src/segment/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@
// (found in the LICENSE-* files in the repository)

use crate::{id::SegmentId, value::UserKey, Compressor, SegmentReader, UserValue};
use interval_heap::IntervalHeap;
use std::cmp::Reverse;

// TODO: replace with MinHeap...
use min_max_heap::MinMaxHeap;
macro_rules! fail_iter {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return Some(Err(e.into())),
}
};
}

type IteratorIndex = usize;

Expand Down Expand Up @@ -42,16 +49,14 @@ impl Ord for IteratorValue {
#[allow(clippy::module_name_repetitions)]
pub struct MergeReader<C: Compressor + Clone> {
readers: Vec<SegmentReader<C>>,
heap: MinMaxHeap<IteratorValue>,
heap: IntervalHeap<IteratorValue>,
}

impl<C: Compressor + Clone> MergeReader<C> {
/// Initializes a new merging reader
pub fn new(readers: Vec<SegmentReader<C>>) -> Self {
Self {
readers,
heap: MinMaxHeap::new(),
}
let heap = IntervalHeap::with_capacity(readers.len());
Self { readers, heap }
}

fn advance_reader(&mut self, idx: usize) -> crate::Result<()> {
Expand Down Expand Up @@ -87,22 +92,16 @@ impl<C: Compressor + Clone> Iterator for MergeReader<C> {

fn next(&mut self) -> Option<Self::Item> {
if self.heap.is_empty() {
if let Err(e) = self.push_next() {
return Some(Err(e));
};
fail_iter!(self.push_next());
}

if let Some(head) = self.heap.pop_min() {
if let Err(e) = self.advance_reader(head.index) {
return Some(Err(e));
}
fail_iter!(self.advance_reader(head.index));

// Discard old items
while let Some(next) = self.heap.pop_min() {
if next.key == head.key {
if let Err(e) = self.advance_reader(next.index) {
return Some(Err(e));
}
fail_iter!(self.advance_reader(next.index));
} else {
// Reached next user key now
// Push back non-conflicting item and exit
Expand Down
76 changes: 27 additions & 49 deletions src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,23 @@
// (found in the LICENSE-* files in the repository)

use super::{meta::METADATA_HEADER_MAGIC, writer::BLOB_HEADER_MAGIC};
use crate::{coding::DecodeError, id::SegmentId, value::UserKey, Compressor, UserValue};
use crate::{coding::DecodeError, id::SegmentId, value::UserKey, Compressor, Slice, UserValue};
use byteorder::{BigEndian, ReadBytesExt};
use std::{
fs::File,
io::{BufReader, Read, Seek},
path::Path,
};

macro_rules! fail_iter {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return Some(Err(e.into())),
}
};
}

/// Reads through a segment in order.
pub struct Reader<C: Compressor + Clone> {
pub(crate) segment_id: SegmentId,
Expand Down Expand Up @@ -62,10 +71,7 @@ impl<C: Compressor + Clone> Iterator for Reader<C> {

{
let mut buf = [0; BLOB_HEADER_MAGIC.len()];

if let Err(e) = self.inner.read_exact(&mut buf) {
return Some(Err(e.into()));
};
fail_iter!(self.inner.read_exact(&mut buf));

if buf == METADATA_HEADER_MAGIC {
self.is_terminated = true;
Expand All @@ -79,54 +85,26 @@ impl<C: Compressor + Clone> Iterator for Reader<C> {
}
}

let checksum = match self.inner.read_u64::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
return Some(Err(e.into()));
}
};
let checksum = fail_iter!(self.inner.read_u64::<BigEndian>());

let key_len = match self.inner.read_u16::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
return Some(Err(e.into()));
}
};

let mut key = vec![0; key_len.into()];
if let Err(e) = self.inner.read_exact(&mut key) {
return Some(Err(e.into()));
};

let val_len = match self.inner.read_u32::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
return Some(Err(e.into()));
}
};

let mut val = vec![0; val_len as usize];
if let Err(e) = self.inner.read_exact(&mut val) {
return Some(Err(e.into()));
};
let key_len = fail_iter!(self.inner.read_u16::<BigEndian>());
let key = fail_iter!(Slice::from_reader(&mut self.inner, key_len as usize));

let val_len = fail_iter!(self.inner.read_u32::<BigEndian>());
let val = match &self.compression {
Some(compressor) => match compressor.decompress(&val) {
Ok(val) => val,
Err(e) => return Some(Err(e)),
},
None => val,
Some(compressor) => {
// TODO: https://github.com/PSeitz/lz4_flex/issues/166
let mut val = vec![0; val_len as usize];
fail_iter!(self.inner.read_exact(&mut val));
Slice::from(fail_iter!(compressor.decompress(&val)))
}
None => {
// NOTE: When not using compression, we can skip
// the intermediary heap allocation and read directly into a Slice
fail_iter!(Slice::from_reader(&mut self.inner, val_len as usize))
}
};

Some(Ok((key.into(), val.into(), checksum)))
Some(Ok((key, val, checksum)))
}
}
34 changes: 30 additions & 4 deletions src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// (found in the LICENSE-* files in the repository)

#[cfg(not(feature = "bytes"))]
mod slice_arc;
mod slice_default;

#[cfg(feature = "bytes")]
mod slice_bytes;
Expand All @@ -13,10 +13,10 @@ use std::{
sync::Arc,
};

#[cfg(not(feature = "bytes"))]
pub use slice_arc::Slice;
#[cfg(feature = "bytes")]
pub use slice_bytes::Slice;
#[cfg(not(feature = "bytes"))]
pub use slice_default::Slice;

impl AsRef<[u8]> for Slice {
fn as_ref(&self) -> &[u8] {
Expand All @@ -26,7 +26,21 @@ impl AsRef<[u8]> for Slice {

impl From<&[u8]> for Slice {
fn from(value: &[u8]) -> Self {
Self::new(value)
#[cfg(not(feature = "bytes"))]
{
Self(byteview::ByteView::new(value))
}

#[cfg(feature = "bytes")]
{
Self(bytes::Bytes::from(value.to_vec()))
}
}
}

impl From<Arc<[u8]>> for Slice {
fn from(value: Arc<[u8]>) -> Self {
Self::from(&*value)
}
}

Expand Down Expand Up @@ -180,6 +194,7 @@ mod serde {
mod tests {
use super::Slice;
use std::{fmt::Debug, sync::Arc};
use test_log::test;

fn assert_slice_handles<T>(v: T)
where
Expand All @@ -192,6 +207,17 @@ mod tests {
assert!(slice >= v, "slice_arc: {slice:?}, v: {v:?}");
}

#[test]
fn slice_empty() {
assert_eq!(Slice::empty(), []);
}

#[test]
fn slice_with_size() {
assert_eq!(Slice::with_size(5), [0, 0, 0, 0, 0]);
assert_eq!(Slice::with_size(50), [0; 50]);
}

/// This test verifies that we can create a `Slice` from various types and compare a `Slice` with them.
#[test]
fn test_slice_instantiation() {
Expand Down
31 changes: 22 additions & 9 deletions src/slice/slice_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use std::sync::Arc;

use bytes::{Bytes, BytesMut};

/// An immutable byte slice that can be cloned without additional heap allocation
///
/// There is no guarantee of any sort of alignment for zero-copy (de)serialization.
#[derive(Debug, Clone, Eq, Hash, Ord)]
pub struct Slice(pub(super) Bytes);

Expand All @@ -17,6 +17,26 @@ impl Slice {
Self(Bytes::copy_from_slice(bytes))
}

#[doc(hidden)]
#[must_use]
pub fn empty() -> Self {
Self(Bytes::from_static(&[]))
}

#[doc(hidden)]
#[must_use]
pub fn slice(&self, range: impl std::ops::RangeBounds<usize>) -> Self {
Self(self.0.slice(range))
}

#[must_use]
#[doc(hidden)]
pub fn with_size(len: usize) -> Self {
let bytes = vec![0; len];
Self(Bytes::from(bytes))
}

/// Constructs a [`Slice`] from an I/O reader by pulling in `len` bytes.
#[doc(hidden)]
pub fn from_reader<R: std::io::Read>(reader: &mut R, len: usize) -> std::io::Result<Self> {
let mut builder = BytesMut::zeroed(len);
Expand Down Expand Up @@ -50,10 +70,3 @@ impl From<String> for Slice {
Self(Bytes::from(value))
}
}

// Needed because slice_arc specializes this impl
impl From<Arc<[u8]>> for Slice {
fn from(value: Arc<[u8]>) -> Self {
Self::new(value.as_ref())
}
}
44 changes: 24 additions & 20 deletions src/slice/slice_arc.rs → src/slice/slice_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use std::sync::Arc;
use byteview::ByteView;

/// An immutable byte slice that can be cloned without additional heap allocation
///
/// There is no guarantee of any sort of alignment for zero-copy (de)serialization.
#[derive(Debug, Clone, Eq, Hash, Ord)]
pub struct Slice(pub(super) Arc<[u8]>);
pub struct Slice(pub(super) ByteView);

impl Slice {
/// Construct a [`Slice`] from a byte slice.
Expand All @@ -15,40 +17,42 @@ impl Slice {
Self(bytes.into())
}

#[doc(hidden)]
#[must_use]
pub fn empty() -> Self {
Self(ByteView::new(&[]))
}

#[doc(hidden)]
#[must_use]
pub fn slice(&self, range: impl std::ops::RangeBounds<usize>) -> Self {
Self(self.0.slice(range))
}

#[must_use]
#[doc(hidden)]
pub fn with_size(len: usize) -> Self {
// TODO: optimize this with byteview to remove the reallocation
let v = vec![0; len];
Self(v.into())
Self(ByteView::with_size(len))
}

/// Constructs a [`Slice`] from an I/O reader by pulling in `len` bytes.
#[doc(hidden)]
pub fn from_reader<R: std::io::Read>(reader: &mut R, len: usize) -> std::io::Result<Self> {
let mut view = Self::with_size(len);
let builder = Arc::get_mut(&mut view.0).expect("we are the owner");
reader.read_exact(builder)?;
Ok(view)
let view = ByteView::from_reader(reader, len)?;
Ok(Self(view))
}
}

// Arc::from<Vec<T>> is specialized
// Arc::from<Vec<u8>> is specialized
impl From<Vec<u8>> for Slice {
fn from(value: Vec<u8>) -> Self {
Self(Arc::from(value))
Self(ByteView::from(value))
}
}

// Arc::from<Vec<T>> is specialized
// Arc::from<Vec<String>> is specialized
impl From<String> for Slice {
fn from(value: String) -> Self {
Self(Arc::from(value.into_bytes()))
}
}

// direct conversion
impl From<Arc<[u8]>> for Slice {
fn from(value: Arc<[u8]>) -> Self {
Self(value)
Self(ByteView::from(value.into_bytes()))
}
}

0 comments on commit 348a7ba

Please sign in to comment.