From 73d3791b04d6603af4b89d3e5aab7d99997b5184 Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Wed, 31 Jan 2024 10:38:44 +0100 Subject: [PATCH] feat: lz4 compression --- Cargo.lock | 26 ++++++++++++++++++++++ Cargo.toml | 3 --- msg-wire/Cargo.toml | 8 ++++--- msg-wire/src/compression/lz4.rs | 39 +++++++++++++++++++++++++++++++++ msg-wire/src/compression/mod.rs | 31 ++++++++++++++++++++++++++ 5 files changed, 101 insertions(+), 6 deletions(-) create mode 100644 msg-wire/src/compression/lz4.rs diff --git a/Cargo.lock b/Cargo.lock index 0b5a802..d949ef4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -667,6 +667,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lz4_flex" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "912b45c753ff5f7f5208307e8ace7d2a2e30d024e26d3509f3dce546c044ce15" +dependencies = [ + "twox-hash", +] + [[package]] name = "memchr" version = "2.7.1" @@ -786,6 +795,7 @@ version = "0.1.1" dependencies = [ "bytes", "flate2", + "lz4_flex", "msg-common", "snap", "thiserror", @@ -1535,6 +1545,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "str_stack" version = "0.1.0" @@ -1785,6 +1801,16 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "unicode-ident" version = "1.0.12" diff --git a/Cargo.toml b/Cargo.toml index 17724e5..54913e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,9 +48,6 @@ quinn = "0.10" # rustls needs to be the same version as the one used by quinn rustls = { version = "0.21", features = ["quic", "dangerous_configuration"] } rcgen = "0.12" -flate2 = "1" -zstd = "0.13" -snap = "1" # benchmarking & profiling criterion = { version = "0.5", features = ["async_tokio"] } diff --git a/msg-wire/Cargo.toml b/msg-wire/Cargo.toml index eb04af0..2a33a2a 100644 --- a/msg-wire/Cargo.toml +++ b/msg-wire/Cargo.toml @@ -17,6 +17,8 @@ bytes.workspace = true thiserror.workspace = true tokio-util.workspace = true tracing.workspace = true -flate2.workspace = true -zstd.workspace = true -snap.workspace = true + +flate2 = "1" +zstd = "0.13" +snap = "1" +lz4_flex = "0.11" diff --git a/msg-wire/src/compression/lz4.rs b/msg-wire/src/compression/lz4.rs new file mode 100644 index 0000000..354eab8 --- /dev/null +++ b/msg-wire/src/compression/lz4.rs @@ -0,0 +1,39 @@ +use bytes::Bytes; +use lz4_flex::{compress, decompress}; +use std::io; + +use super::{CompressionType, Compressor, Decompressor}; + +/// A compressor that uses the LZ4 algorithm. +#[derive(Default)] +pub struct Lz4Compressor; + +impl Compressor for Lz4Compressor { + fn compression_type(&self) -> CompressionType { + CompressionType::Lz4 + } + + fn compress(&self, data: &[u8]) -> Result { + let bytes = compress(data); + + Ok(Bytes::from(bytes)) + } +} + +#[derive(Debug, Default)] +pub struct Lz4Decompressor; + +impl Decompressor for Lz4Decompressor { + fn decompress(&self, data: &[u8]) -> Result { + // Usually the Lz4 compression ratio is 2.1x. So 4x should be plenty. + let min_uncompressed_size = data.len() * 4; + let bytes = decompress(data, min_uncompressed_size).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Lz4 decompression failed: {}", e), + ) + })?; + + Ok(Bytes::from(bytes)) + } +} diff --git a/msg-wire/src/compression/mod.rs b/msg-wire/src/compression/mod.rs index 9bfcb4f..0f92d3a 100644 --- a/msg-wire/src/compression/mod.rs +++ b/msg-wire/src/compression/mod.rs @@ -2,9 +2,11 @@ use bytes::Bytes; use std::io; mod gzip; +mod lz4; mod snappy; mod zstd; pub use gzip::*; +pub use lz4::*; pub use snappy::*; pub use zstd::*; @@ -16,6 +18,7 @@ pub enum CompressionType { Gzip = 1, Zstd = 2, Snappy = 3, + Lz4 = 4, } impl TryFrom for CompressionType { @@ -27,6 +30,7 @@ impl TryFrom for CompressionType { 1 => Ok(CompressionType::Gzip), 2 => Ok(CompressionType::Zstd), 3 => Ok(CompressionType::Snappy), + 4 => Ok(CompressionType::Lz4), _ => Err(value), } } @@ -63,6 +67,7 @@ pub fn try_decompress_payload(compression_type: u8, data: Bytes) -> Result GzipDecompressor.decompress(data.as_ref()), CompressionType::Zstd => ZstdDecompressor.decompress(data.as_ref()), CompressionType::Snappy => SnappyDecompressor.decompress(data.as_ref()), + CompressionType::Lz4 => Lz4Decompressor.decompress(data.as_ref()), }, Err(unsupported_compression_type) => Err(io::Error::new( io::ErrorKind::InvalidData, @@ -120,6 +125,21 @@ mod tests { assert_eq!(data, decompressed); } + #[test] + fn test_lz4_compression() { + let compressor = Lz4Compressor; + let decompressor = Lz4Decompressor; + + let data = + Bytes::from("hellooooooooooooooooo wwwwwoooooooooooooooooooooooooooooooooooooorld"); + println!("Before: {:?}", data.len()); + let compressed = compressor.compress(&data).unwrap(); + println!("After: {:?}", compressed.len()); + let decompressed = decompressor.decompress(&compressed).unwrap(); + + assert_eq!(data, decompressed); + } + fn compression_test(data: &Bytes, comp: C) -> (std::time::Duration, f64, Bytes) { let uncompressed_size = data.len() as f64; let start = std::time::Instant::now(); @@ -169,6 +189,13 @@ mod tests { snappy_perf, snappy_time ); + let lz4 = Lz4Compressor; + let (lz4_time, lz4_perf, lz4_comp) = compression_test(&data, lz4); + println!( + "lz4 compression shrank the data by {:.2}% in {:?}", + lz4_perf, lz4_time + ); + println!("------"); let gzip = GzipDecompressor; @@ -182,5 +209,9 @@ mod tests { let snappy = SnappyDecompressor; let snappy_time = decompression_test(&snappy_comp, snappy); println!("snappy decompression took {:?}", snappy_time); + + let lz4 = Lz4Decompressor; + let lz4_time = decompression_test(&lz4_comp, lz4); + println!("lz4 decompression took {:?}", lz4_time); } }