Skip to content

Commit

Permalink
compression: fix decomp buffer mgt
Browse files Browse the repository at this point in the history
Due to the usage of `AlignedStorage` and its Wrapper^n `Buf`  when storing and
serializing buffers length information is lost and the total length will simply
be set to the capacity. This is wrong. While serialization libraries seem to
handle this well enough, decompression will stop working. This patch reworks
the compression part to actually store the length alongside the data to ensure
compatibility with the decompression process of some codecs (e.g. zstd).
  • Loading branch information
Johannes Wünsche committed Jan 25, 2024
1 parent df1ed2f commit db3cb1a
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 50 deletions.
22 changes: 22 additions & 0 deletions betree/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ impl BufWrite {
buf: Arc::new(UnsafeCell::new(self.buf)),
})
}

pub fn len(&self) -> usize {
self.size as usize
}
}

impl io::Write for BufWrite {
Expand All @@ -290,6 +294,24 @@ impl io::Write for BufWrite {
}
}

unsafe impl zstd::stream::raw::WriteBuf for BufWrite {
fn as_slice(&self) -> &[u8] {
self.as_ref()
}

fn capacity(&self) -> usize {
self.buf.capacity.to_bytes() as usize
}

fn as_mut_ptr(&mut self) -> *mut u8 {
self.buf.ptr
}

unsafe fn filled_until(&mut self, n: usize) {
self.size = n as u32
}
}

impl io::Seek for BufWrite {
fn seek(&mut self, seek: io::SeekFrom) -> io::Result<u64> {
use io::SeekFrom::*;
Expand Down
4 changes: 2 additions & 2 deletions betree/src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! `None` and `Lz4` are provided as implementation.

use crate::{
buffer::Buf,
buffer::{Buf, BufWrite},
size::{Size, StaticSize},
vdev::Block,
};
Expand Down Expand Up @@ -72,7 +72,7 @@ pub trait CompressionBuilder: Debug + Size + Send + Sync + 'static {
pub trait CompressionState: Write {
/// Finishes the compression stream and returns a buffer that contains the
/// compressed data.
fn finish(&mut self) -> Buf;
fn finish(&mut self, data: Buf) -> Result<BufWrite>;
}

pub trait DecompressionState {
Expand Down
4 changes: 2 additions & 2 deletions betree/src/compression/none.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ impl io::Write for NoneCompression {
}

impl CompressionState for NoneCompression {
fn finish(&mut self) -> Buf {
mem::replace(&mut self.buf, BufWrite::with_capacity(DEFAULT_BUFFER_SIZE)).into_buf()
fn finish(&mut self, buf: Buf) -> Result<BufWrite> {
Ok(buf.into_buf_write())
}
}

Expand Down
107 changes: 70 additions & 37 deletions betree/src/compression/zstd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ use super::{
};
use crate::{
buffer::{Buf, BufWrite},
database,
size::StaticSize,
};
use serde::{Deserialize, Serialize};
use std::{
io::{self, Write},
io::{self, Cursor, Write},
mem,
};
use zstd::stream::{
raw::{CParameter, DParameter, Decoder, Encoder},
zio::Writer,
use zstd::{
block::{Compressor, Decompressor},
stream::{
raw::{CParameter, DParameter, Decoder, Encoder},
zio::{Reader, Writer},
},
};
use zstd_safe::FrameFormat;
use zstd_safe::{FrameFormat, InBuffer, OutBuffer};

// TODO: investigate pre-created dictionary payoff

Expand All @@ -28,10 +32,11 @@ pub struct Zstd {
}

struct ZstdCompression {
writer: Writer<BufWrite, Encoder<'static>>,
writer: Compressor,
lvl: i32,
}
struct ZstdDecompression {
writer: Writer<BufWrite, Decoder<'static>>,
writer: Decompressor,
}

impl StaticSize for Zstd {
Expand All @@ -40,21 +45,22 @@ impl StaticSize for Zstd {
}
}

use zstd::stream::raw::Operation;

impl CompressionBuilder for Zstd {
fn new_compression(&self) -> Result<Box<dyn CompressionState>> {
// "The library supports regular compression levels from 1 up to ZSTD_maxCLevel(),
// which is currently 22."
let mut encoder = Encoder::new(self.level as i32)?;
// let mut encoder = Encoder::new(self.level as i32)?;

// Compression format is stored externally, don't need to duplicate it
encoder.set_parameter(CParameter::Format(FrameFormat::Magicless))?;
// Integrity is handled at a different layer
encoder.set_parameter(CParameter::ChecksumFlag(false))?;

let buf = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE);
// encoder.set_parameter(CParameter::Format(FrameFormat::Magicless))?;
// // Integrity is handled at a different layer
// encoder.set_parameter(CParameter::ChecksumFlag(false))?;

Ok(Box::new(ZstdCompression {
writer: Writer::new(buf, encoder),
writer: Compressor::new(),
lvl: self.level as i32,
}))
}

Expand All @@ -66,51 +72,78 @@ impl CompressionBuilder for Zstd {
impl Zstd {
pub fn new_decompression() -> Result<Box<dyn DecompressionState>> {
let mut decoder = Decoder::new()?;
decoder.set_parameter(DParameter::Format(FrameFormat::Magicless))?;
decoder.set_parameter(DParameter::Format(FrameFormat::One))?;
// decoder.set_parameter(DParameter::ForceIgnoreChecksum(true))?;

let buf = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE);
Ok(Box::new(ZstdDecompression {
writer: Writer::new(buf, decoder),
writer: Decompressor::new(),
}))
}
}

impl io::Write for ZstdCompression {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.writer.write(buf)
unimplemented!()
}

fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.writer.write_all(buf)
unimplemented!()
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
unimplemented!()
}
}

impl CompressionState for ZstdCompression {
fn finish(&mut self) -> Buf {
let _ = self.writer.finish();

mem::replace(
self.writer.writer_mut(),
BufWrite::with_capacity(DEFAULT_BUFFER_SIZE),
)
.into_buf()
fn finish(&mut self, data: Buf) -> Result<BufWrite> {
let mut buf = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE);
zstd::stream::copy_encode(data.as_ref(), &mut buf, self.lvl)?;
// self.writer.compress_to_buffer(&data, &mut buf, self.lvl)?;
Ok(buf)
}
}

impl DecompressionState for ZstdDecompression {
fn decompress(&mut self, data: &[u8]) -> Result<Box<[u8]>> {
self.writer.write_all(data)?;
self.writer.finish()?;

Ok(mem::replace(
self.writer.writer_mut(),
BufWrite::with_capacity(DEFAULT_BUFFER_SIZE),
)
.into_buf()
.into_boxed_slice())
let mut buf = Vec::with_capacity(DEFAULT_BUFFER_SIZE.to_bytes() as usize);
// let buf = self.writer.decompress(data, 8 * 1024 * 1024)?;
zstd::stream::copy_decode(data.as_ref(), &mut buf)?;
Ok(buf.into())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn encode_then_decode() {
let buf = Buf::from_zero_padded(vec![42u8; 4097]);
let zstd = Zstd { level: 1 };
let mut comp = zstd.new_compression().unwrap();
let c_buf = comp.finish(buf.clone()).unwrap();
let reference = zstd::stream::encode_all(&buf[..], 1).unwrap();
assert_eq!(reference, c_buf.as_ref());
assert_eq!(
zstd::stream::decode_all(reference.as_slice()).unwrap(),
buf.as_ref()
);
assert_eq!(
zstd::stream::decode_all(c_buf.as_ref()).unwrap(),
buf.as_ref()
);
let mut decomp = zstd.decompression_tag().new_decompression().unwrap();
// let d_buf = decomp.decompress(&reference).unwrap();
let d_buf = decomp.decompress(c_buf.as_ref()).unwrap();
assert_eq!(buf.as_ref(), &*d_buf);
}

#[test]
fn sanity() {
let buf = [42u8, 42];
let c_buf = zstd::stream::encode_all(&buf[..], 1).unwrap();
let d_buf = zstd::stream::decode_all(c_buf.as_slice()).unwrap();
assert_eq!(&buf, d_buf.as_slice());
}
}
16 changes: 7 additions & 9 deletions betree/src/data_management/dmu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,12 @@ where
let compressed_data = {
// FIXME: cache this
let mut state = compression.new_compression()?;
let mut buf = crate::buffer::BufWrite::with_capacity(Block(128));
{
object.pack(&mut state)?;
object.pack(&mut buf)?;
drop(object);
}
state.finish()
state.finish(buf.into_buf())?
};

assert!(compressed_data.len() <= u32::max_value() as usize);
Expand All @@ -408,11 +409,12 @@ where

let checksum = {
let mut state = self.default_checksum_builder.build();
state.ingest(&compressed_data);
state.ingest(compressed_data.as_ref());
state.finish()
};

self.pool.begin_write(compressed_data, offset)?;
// FIXME: COMPRESSION
self.pool.begin_write(compressed_data.into_buf(), offset)?;

let obj_ptr = ObjectPointer {
offset,
Expand Down Expand Up @@ -499,11 +501,7 @@ where
{
warn!(
"Storage tier {class} does not have enough space remaining. {} blocks of {}",
self.handler
.free_space_tier(class)
.unwrap()
.free
.as_u64(),
self.handler.free_space_tier(class).unwrap().free.as_u64(),
size.as_u64()
);
continue;
Expand Down

0 comments on commit db3cb1a

Please sign in to comment.