Skip to content

Commit

Permalink
compression: fix decomp buffer mgt (#51)
Browse files Browse the repository at this point in the history
* compression: fix decomp buffer mgt

Due to the usage of `AlignedStorage` and its Wrapper^n `Buf`  when storing and
serializing buffers length information is lost and the total length will simply
be set to the capacity. This is wrong. While serialization libraries seem to
handle this well enough, the decompression structure used from `zstd` did not
(This could be a bug though). This patch reworks the de/compression part to do 4
things:

- store the length of the original data alongside the compressed to ensure the
  minimal amount of reallocation on decompression (1); before for each block a
  reallocation happened bc of how the `zstd` decompression writer works
  internally

- minimize the amount of copies during compression and decompression; the
  `zstd` writer holds an internal buffer which, to my understanding, always
  buffers the intermediate results and then copies it to the final buffer. We
  avoid this structure now entirely to reduce the amount of copies

- estimate the size of the compressed results inbefore to reduce realloc's

- rework the compression interface to be similar to a one-and-done process,
  compressors and decompressors might be reused later on (needs an additional
  reinit), but the append-append-compress workflow was never used and comes with
  inefficienct reallocs

* compression: fix incorrect length stored

* compression: avoid copy on none algo

This commit fixes the redundant copy that was done when calling `decompress`
with the `None` compression. We should see less memcpys now.
  • Loading branch information
jwuensche authored Jan 26, 2024
1 parent df1ed2f commit f559bda
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 57 deletions.
22 changes: 22 additions & 0 deletions betree/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ impl BufWrite {
buf: Arc::new(UnsafeCell::new(self.buf)),
})
}

pub fn len(&self) -> usize {
self.size as usize
}
}

impl io::Write for BufWrite {
Expand All @@ -290,6 +294,24 @@ impl io::Write for BufWrite {
}
}

unsafe impl zstd::stream::raw::WriteBuf for BufWrite {
fn as_slice(&self) -> &[u8] {
self.as_ref()
}

fn capacity(&self) -> usize {
self.buf.capacity.to_bytes() as usize
}

fn as_mut_ptr(&mut self) -> *mut u8 {
self.buf.ptr
}

unsafe fn filled_until(&mut self, n: usize) {
self.size = n as u32
}
}

impl io::Seek for BufWrite {
fn seek(&mut self, seek: io::SeekFrom) -> io::Result<u64> {
use io::SeekFrom::*;
Expand Down
6 changes: 3 additions & 3 deletions betree/src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! `None` and `Lz4` are provided as implementation.

use crate::{
buffer::Buf,
buffer::{Buf, BufWrite},
size::{Size, StaticSize},
vdev::Block,
};
Expand Down Expand Up @@ -72,11 +72,11 @@ pub trait CompressionBuilder: Debug + Size + Send + Sync + 'static {
pub trait CompressionState: Write {
/// Finishes the compression stream and returns a buffer that contains the
/// compressed data.
fn finish(&mut self) -> Buf;
fn finish(&mut self, data: Buf) -> Result<Buf>;
}

pub trait DecompressionState {
fn decompress(&mut self, data: &[u8]) -> Result<Box<[u8]>>;
fn decompress(&mut self, data: Buf) -> Result<Buf>;
}

mod none;
Expand Down
8 changes: 4 additions & 4 deletions betree/src/compression/none.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ impl io::Write for NoneCompression {
}

impl CompressionState for NoneCompression {
fn finish(&mut self) -> Buf {
mem::replace(&mut self.buf, BufWrite::with_capacity(DEFAULT_BUFFER_SIZE)).into_buf()
fn finish(&mut self, buf: Buf) -> Result<Buf> {
Ok(buf)
}
}

impl DecompressionState for NoneDecompression {
fn decompress(&mut self, data: &[u8]) -> Result<Box<[u8]>> {
fn decompress(&mut self, data: Buf) -> Result<Buf> {
// FIXME: pass-through Buf, reusing alloc
Ok(data.to_vec().into_boxed_slice())
Ok(data)
}
}
144 changes: 106 additions & 38 deletions betree/src/compression/zstd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ use super::{
};
use crate::{
buffer::{Buf, BufWrite},
database,
size::StaticSize,
vdev::Block,
};
use serde::{Deserialize, Serialize};
use std::{
io::{self, Write},
io::{self, Cursor, Write},
mem,
};
use zstd::stream::{
raw::{CParameter, DParameter, Decoder, Encoder},
zio::Writer,
use zstd::{
block::{Compressor, Decompressor},
stream::{
raw::{CParameter, DParameter, Decoder, Encoder},
zio::{Reader, Writer},
},
};
use zstd_safe::FrameFormat;
use zstd_safe::{FrameFormat, InBuffer, OutBuffer, WriteBuf};

// TODO: investigate pre-created dictionary payoff

Expand All @@ -28,10 +33,10 @@ pub struct Zstd {
}

struct ZstdCompression {
writer: Writer<BufWrite, Encoder<'static>>,
writer: Encoder<'static>,
}
struct ZstdDecompression {
writer: Writer<BufWrite, Decoder<'static>>,
writer: Decoder<'static>,
}

impl StaticSize for Zstd {
Expand All @@ -40,6 +45,8 @@ impl StaticSize for Zstd {
}
}

use zstd::stream::raw::Operation;

impl CompressionBuilder for Zstd {
fn new_compression(&self) -> Result<Box<dyn CompressionState>> {
// "The library supports regular compression levels from 1 up to ZSTD_maxCLevel(),
Expand All @@ -48,14 +55,10 @@ impl CompressionBuilder for Zstd {

// Compression format is stored externally, don't need to duplicate it
encoder.set_parameter(CParameter::Format(FrameFormat::Magicless))?;
// Integrity is handled at a different layer
// // Integrity is handled at a different layer
encoder.set_parameter(CParameter::ChecksumFlag(false))?;

let buf = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE);

Ok(Box::new(ZstdCompression {
writer: Writer::new(buf, encoder),
}))
Ok(Box::new(ZstdCompression { writer: encoder }))
}

fn decompression_tag(&self) -> DecompressionTag {
Expand All @@ -67,50 +70,115 @@ impl Zstd {
pub fn new_decompression() -> Result<Box<dyn DecompressionState>> {
let mut decoder = Decoder::new()?;
decoder.set_parameter(DParameter::Format(FrameFormat::Magicless))?;
// decoder.set_parameter(DParameter::ForceIgnoreChecksum(true))?;

let buf = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE);
Ok(Box::new(ZstdDecompression {
writer: Writer::new(buf, decoder),
}))
Ok(Box::new(ZstdDecompression { writer: decoder }))
}
}

impl io::Write for ZstdCompression {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.writer.write(buf)
unimplemented!()
}

fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.writer.write_all(buf)
unimplemented!()
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
unimplemented!()
}
}

use speedy::{Readable, Writable};
const DATA_OFF: usize = mem::size_of::<u32>();

impl CompressionState for ZstdCompression {
fn finish(&mut self) -> Buf {
let _ = self.writer.finish();

mem::replace(
self.writer.writer_mut(),
BufWrite::with_capacity(DEFAULT_BUFFER_SIZE),
)
.into_buf()
fn finish(&mut self, data: Buf) -> Result<Buf> {
let size = zstd_safe::compress_bound(data.as_ref().len());
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));
buf.write_all(&[0u8; DATA_OFF])?;

let mut input = zstd::stream::raw::InBuffer::around(&data);
let mut output = zstd::stream::raw::OutBuffer::around_pos(&mut buf, DATA_OFF);
let mut finished_frame;
loop {
let remaining = self.writer.run(&mut input, &mut output)?;
finished_frame = remaining == 0;
if input.pos() > 0 || data.is_empty() {
break;
}
}

while self.writer.flush(&mut output)? > 0 {}
self.writer.finish(&mut output, finished_frame)?;

let og_len = data.len() as u32;
og_len
.write_to_buffer(&mut buf.as_mut()[..DATA_OFF])
.unwrap();
Ok(buf.into_buf())
}
}

impl DecompressionState for ZstdDecompression {
fn decompress(&mut self, data: &[u8]) -> Result<Box<[u8]>> {
self.writer.write_all(data)?;
self.writer.finish()?;

Ok(mem::replace(
self.writer.writer_mut(),
BufWrite::with_capacity(DEFAULT_BUFFER_SIZE),
)
.into_buf()
.into_boxed_slice())
fn decompress(&mut self, data: Buf) -> Result<Buf> {
let size = u32::read_from_buffer(data.as_ref()).unwrap();
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size));

let mut input = zstd::stream::raw::InBuffer::around(&data[DATA_OFF..]);
let mut output = zstd::stream::raw::OutBuffer::around(&mut buf);

let mut finished_frame;
loop {
let remaining = self.writer.run(&mut input, &mut output)?;
finished_frame = remaining == 0;
if remaining > 0 {
if output.dst.capacity() == output.dst.as_ref().len() {
// append faux byte to extend in case that original was
// wrong for some reason (this should not happen but is a
// sanity guard)
output.dst.write(&[0])?;
}
continue;
}
if input.pos() > 0 || data.is_empty() {
break;
}
}

while self.writer.flush(&mut output)? > 0 {}
self.writer.finish(&mut output, finished_frame)?;

Ok(buf.into_buf())
}
}

#[cfg(test)]
mod tests {
use rand::RngCore;

use super::*;

#[test]
fn encode_then_decode() {
let mut rng = rand::thread_rng();
let mut buf = vec![42u8; 4 * 1024 * 1024];
rng.fill_bytes(buf.as_mut());
let buf = Buf::from_zero_padded(buf);
let zstd = Zstd { level: 1 };
let mut comp = zstd.new_compression().unwrap();
let c_buf = comp.finish(buf.clone()).unwrap();
let mut decomp = zstd.decompression_tag().new_decompression().unwrap();
let d_buf = decomp.decompress(c_buf).unwrap();
assert_eq!(buf.as_ref().len(), d_buf.as_ref().len());
}

#[test]
fn sanity() {
let buf = [42u8, 42];
let c_buf = zstd::stream::encode_all(&buf[..], 1).unwrap();
let d_buf = zstd::stream::decode_all(c_buf.as_slice()).unwrap();
assert_eq!(&buf, d_buf.as_slice());
}
}
21 changes: 9 additions & 12 deletions betree/src/data_management/dmu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ where
.read(op.size(), op.offset(), op.checksum().clone())?;

let object: Node<ObjRef<ObjectPointer<SPL::Checksum>>> = {
let data = decompression_state.decompress(&compressed_data)?;
Object::unpack_at(op.offset(), op.info(), data)?
let data = decompression_state.decompress(compressed_data)?;
Object::unpack_at(op.offset(), op.info(), data.into_boxed_slice())?
};
let key = ObjectKey::Unmodified { offset, generation };
self.insert_object_into_cache(key, TaggedCacheValue::new(RwLock::new(object), pivot_key));
Expand Down Expand Up @@ -384,11 +384,12 @@ where
let compressed_data = {
// FIXME: cache this
let mut state = compression.new_compression()?;
let mut buf = crate::buffer::BufWrite::with_capacity(Block(128));
{
object.pack(&mut state)?;
object.pack(&mut buf)?;
drop(object);
}
state.finish()
state.finish(buf.into_buf())?
};

assert!(compressed_data.len() <= u32::max_value() as usize);
Expand All @@ -408,7 +409,7 @@ where

let checksum = {
let mut state = self.default_checksum_builder.build();
state.ingest(&compressed_data);
state.ingest(compressed_data.as_ref());
state.finish()
};

Expand Down Expand Up @@ -499,11 +500,7 @@ where
{
warn!(
"Storage tier {class} does not have enough space remaining. {} blocks of {}",
self.handler
.free_space_tier(class)
.unwrap()
.free
.as_u64(),
self.handler.free_space_tier(class).unwrap().free.as_u64(),
size.as_u64()
);
continue;
Expand Down Expand Up @@ -939,8 +936,8 @@ where
let data = ptr
.decompression_tag()
.new_decompression()?
.decompress(&compressed_data)?;
Object::unpack_at(ptr.offset(), ptr.info(), data)?
.decompress(compressed_data)?;
Object::unpack_at(ptr.offset(), ptr.info(), data.into_boxed_slice())?
};
let key = ObjectKey::Unmodified {
offset: ptr.offset(),
Expand Down

0 comments on commit f559bda

Please sign in to comment.