Skip to content

Commit

Permalink
feat: lz4 compression
Browse files Browse the repository at this point in the history
  • Loading branch information
merklefruit committed Jan 31, 2024
1 parent 65605e4 commit 73d3791
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 6 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ quinn = "0.10"
# rustls needs to be the same version as the one used by quinn
rustls = { version = "0.21", features = ["quic", "dangerous_configuration"] }
rcgen = "0.12"
flate2 = "1"
zstd = "0.13"
snap = "1"

# benchmarking & profiling
criterion = { version = "0.5", features = ["async_tokio"] }
Expand Down
8 changes: 5 additions & 3 deletions msg-wire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ bytes.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
tracing.workspace = true
flate2.workspace = true
zstd.workspace = true
snap.workspace = true

flate2 = "1"
zstd = "0.13"
snap = "1"
lz4_flex = "0.11"
39 changes: 39 additions & 0 deletions msg-wire/src/compression/lz4.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use bytes::Bytes;
use lz4_flex::{compress, decompress};
use std::io;

use super::{CompressionType, Compressor, Decompressor};

/// A compressor that uses the LZ4 algorithm.
#[derive(Default)]
pub struct Lz4Compressor;

impl Compressor for Lz4Compressor {
fn compression_type(&self) -> CompressionType {
CompressionType::Lz4
}

fn compress(&self, data: &[u8]) -> Result<Bytes, io::Error> {
let bytes = compress(data);

Ok(Bytes::from(bytes))
}
}

#[derive(Debug, Default)]
pub struct Lz4Decompressor;

impl Decompressor for Lz4Decompressor {
fn decompress(&self, data: &[u8]) -> Result<Bytes, io::Error> {
// Usually the Lz4 compression ratio is 2.1x. So 4x should be plenty.
let min_uncompressed_size = data.len() * 4;
let bytes = decompress(data, min_uncompressed_size).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Lz4 decompression failed: {}", e),
)
})?;

Ok(Bytes::from(bytes))
}
}
31 changes: 31 additions & 0 deletions msg-wire/src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use bytes::Bytes;
use std::io;

mod gzip;
mod lz4;
mod snappy;
mod zstd;
pub use gzip::*;
pub use lz4::*;
pub use snappy::*;
pub use zstd::*;

Expand All @@ -16,6 +18,7 @@ pub enum CompressionType {
Gzip = 1,
Zstd = 2,
Snappy = 3,
Lz4 = 4,
}

impl TryFrom<u8> for CompressionType {
Expand All @@ -27,6 +30,7 @@ impl TryFrom<u8> for CompressionType {
1 => Ok(CompressionType::Gzip),
2 => Ok(CompressionType::Zstd),
3 => Ok(CompressionType::Snappy),
4 => Ok(CompressionType::Lz4),
_ => Err(value),
}
}
Expand Down Expand Up @@ -63,6 +67,7 @@ pub fn try_decompress_payload(compression_type: u8, data: Bytes) -> Result<Bytes
CompressionType::Gzip => GzipDecompressor.decompress(data.as_ref()),
CompressionType::Zstd => ZstdDecompressor.decompress(data.as_ref()),
CompressionType::Snappy => SnappyDecompressor.decompress(data.as_ref()),
CompressionType::Lz4 => Lz4Decompressor.decompress(data.as_ref()),
},
Err(unsupported_compression_type) => Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down Expand Up @@ -120,6 +125,21 @@ mod tests {
assert_eq!(data, decompressed);
}

#[test]
fn test_lz4_compression() {
let compressor = Lz4Compressor;
let decompressor = Lz4Decompressor;

let data =
Bytes::from("hellooooooooooooooooo wwwwwoooooooooooooooooooooooooooooooooooooorld");
println!("Before: {:?}", data.len());
let compressed = compressor.compress(&data).unwrap();
println!("After: {:?}", compressed.len());
let decompressed = decompressor.decompress(&compressed).unwrap();

assert_eq!(data, decompressed);
}

fn compression_test<C: Compressor>(data: &Bytes, comp: C) -> (std::time::Duration, f64, Bytes) {
let uncompressed_size = data.len() as f64;
let start = std::time::Instant::now();
Expand Down Expand Up @@ -169,6 +189,13 @@ mod tests {
snappy_perf, snappy_time
);

let lz4 = Lz4Compressor;
let (lz4_time, lz4_perf, lz4_comp) = compression_test(&data, lz4);
println!(
"lz4 compression shrank the data by {:.2}% in {:?}",
lz4_perf, lz4_time
);

println!("------");

let gzip = GzipDecompressor;
Expand All @@ -182,5 +209,9 @@ mod tests {
let snappy = SnappyDecompressor;
let snappy_time = decompression_test(&snappy_comp, snappy);
println!("snappy decompression took {:?}", snappy_time);

let lz4 = Lz4Decompressor;
let lz4_time = decompression_test(&lz4_comp, lz4);
println!("lz4 decompression took {:?}", lz4_time);
}
}

0 comments on commit 73d3791

Please sign in to comment.