Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fast encode and compress #61

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ fxhash = "0.2"
nix = "0.18.0"
crossbeam = "0.7"
thiserror = "1.0"
libc = "0.2"

[dev-dependencies]
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] }
tempfile = "3.1"
toml = "0.5"
ctor = "0.1"
env_logger = "0.8"

[patch.crates-io]
protobuf = { git = "https://github.com/pingcap/rust-protobuf", rev = "ac10abf324a6f2b3e19e10f82b568a293ca5bd3d" }
212 changes: 212 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use std::i32;
use std::ptr::{copy_nonoverlapping, read_unaligned};

use libc::c_int;
use lz4_sys::{
LZ4StreamEncode, LZ4_compressBound, LZ4_compress_default, LZ4_createStreamDecode,
LZ4_decompress_safe, LZ4_decompress_safe_continue, LZ4_freeStreamDecode,
};

// Layout of single block compression:
// header + decoded_size + content + cap(tail).
pub fn encode_block(src: &[u8], head_reserve: usize, tail_alloc: usize) -> Vec<u8> {
unsafe {
let bound = LZ4_compressBound(src.len() as i32);
assert!(bound > 0 && src.len() <= i32::MAX as usize);

let capacity = head_reserve + 4 + bound as usize + tail_alloc;
let mut output: Vec<u8> = Vec::with_capacity(capacity);

let le_len = src.len().to_le_bytes();
copy_nonoverlapping(le_len.as_ptr(), output.as_mut_ptr().add(head_reserve), 4);

let size = LZ4_compress_default(
src.as_ptr() as _,
output.as_mut_ptr().add(head_reserve + 4) as _,
src.len() as i32,
bound,
);
assert!(size > 0);
output.set_len(head_reserve + 4 + size as usize);
output
}
}

pub fn decode_block(src: &[u8]) -> Vec<u8> {
assert!(src.len() > 4, "data is too short: {} <= 4", src.len());
unsafe {
let len = u32::from_le(read_unaligned(src.as_ptr() as *const u32));
let mut dst = Vec::with_capacity(len as usize);
let l = LZ4_decompress_safe(
src.as_ptr().add(4) as _,
dst.as_mut_ptr() as _,
src.len() as i32 - 4,
dst.capacity() as i32,
);
assert_eq!(l, len as i32);
dst.set_len(l as usize);
dst
}
}

// Layout of multi blocks compression:
// header + decoded_size + vec[encoded_len_and_content] + cap(tail).
pub fn encode_blocks<'a, F, I>(inputs: F, head_reserve: usize, tail_alloc: usize) -> Vec<u8>
where
F: Fn() -> I,
I: Iterator<Item = &'a [u8]>,
{
let (mut encoded_len, mut decoded_len) = (0, 0u64);
for buffer in inputs() {
let len = buffer.len();
decoded_len += len as u64;
let size = unsafe { lz4_sys::LZ4_compressBound(len as i32) };
assert!(size > 0);
encoded_len += (4 + size) as usize; // Length and content.
}

let capacity = head_reserve + 8 + encoded_len + tail_alloc;
let mut output: Vec<u8> = Vec::with_capacity(capacity);
unsafe {
copy_nonoverlapping(
decoded_len.to_le_bytes().as_ptr(),
output.as_mut_ptr().add(head_reserve),
8,
);

let (stream, mut offset) = (lz4_sys::LZ4_createStream(), head_reserve + 8);
for buffer in inputs() {
let bytes = LZ4_compress_fast_continue(
stream,
buffer.as_ptr() as _,
output.as_mut_ptr().add(offset + 4),
buffer.len() as i32,
(capacity - offset) as i32,
4, /* acceleration */
);
assert!(bytes > 0);
copy_nonoverlapping(
(bytes as u32).to_le_bytes().as_ptr(),
output.as_mut_ptr().add(offset),
4,
);
offset += (bytes + 4) as usize;
}

lz4_sys::LZ4_freeStream(stream);
output.set_len(offset);
}
output
}

pub fn decode_blocks(mut src: &[u8]) -> Vec<u8> {
assert!(src.len() > 8, "data is too short: {} <= 8", src.len());
unsafe {
let decoded_len = u64::from_le(read_unaligned(src.as_ptr() as *const u64));
let mut dst: Vec<u8> = Vec::with_capacity(decoded_len as usize);
src = &src[8..];

let (decoder, mut offset) = (LZ4_createStreamDecode(), 0);
while !src.is_empty() {
let len = u32::from_le(read_unaligned(src.as_ptr() as *const u32));
let bytes = LZ4_decompress_safe_continue(
decoder,
src.as_ptr().add(4) as _,
dst.as_mut_ptr().add(offset) as _,
len as i32,
(dst.capacity() - offset) as i32,
);
assert!(bytes >= 0);
offset += bytes as usize;
src = &src[(4 + len as usize)..];
}
LZ4_freeStreamDecode(decoder);
assert_eq!(offset, decoded_len as usize);
dst.set_len(offset);
dst
}
}

extern "C" {
// It's not in lz4_sys.
fn LZ4_compress_fast_continue(
LZ4_stream: *mut LZ4StreamEncode,
source: *const u8,
dest: *mut u8,
input_size: c_int,
dest_capacity: c_int,
acceleration: c_int,
) -> c_int;
}

#[cfg(test)]
mod tests {
use super::*;
use test::{black_box, Bencher};

#[test]
fn test_basic() {
let data: Vec<&'static [u8]> = vec![b"", b"123", b"12345678910"];
for d in data {
let compressed = encode_block(d, 0, 0);
assert!(compressed.len() > 4);
let res = decode_block(&compressed);
assert_eq!(res, d);
}
}

#[test]
fn test_blocks() {
let raw_inputs = vec![
b"".to_vec(),
b"123".to_vec(),
b"12345678910".to_vec(),
vec![b'x'; 99999],
vec![0; 33333],
];

let mut input = Vec::with_capacity(raw_inputs.iter().map(|x| x.len()).sum());
for x in &raw_inputs {
input.extend_from_slice(x);
}

let encoded = encode_blocks(|| raw_inputs.iter().map(|x| x.as_slice()), 0, 0);
let decoded = decode_blocks(&encoded);
assert_eq!(input, decoded);
}

fn gen_64k_block() -> Vec<u8> {
let data = b"abcdefghigklmnokqrstuvwxyz";
let block_size = 64 * 1024;
let mut block = Vec::with_capacity(block_size);
while block.len() < block_size {
if block_size - block.len() >= data.len() {
block.extend_from_slice(data);
} else {
block.extend_from_slice(&data[..block_size - block.len()]);
}
}
block
}

#[bench]
fn bench_blocks_64k_8(b: &mut Bencher) {
let block = gen_64k_block();
let blocks = vec![block; 8];
b.iter(|| {
black_box(encode_blocks(|| blocks.iter().map(|x| x.as_slice()), 0, 0));
})
}

#[bench]
fn bench_single_block_64k_8(b: &mut Bencher) {
let block = gen_64k_block();
let mut blocks = Vec::with_capacity(64 * 1024 * 8);
for _ in 0..8 {
blocks.extend_from_slice(&block);
}
b.iter(|| {
black_box(encode_block(&blocks, 0, 0));
})
}
}
17 changes: 11 additions & 6 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use protobuf::Message;
use crate::cache_evict::{
CacheSubmitor, CacheTask, Runner as CacheEvictRunner, DEFAULT_CACHE_CHUNK_SIZE,
};
use crate::compression::{decode_block, decode_blocks};
use crate::config::{Config, RecoveryMode};
use crate::log_batch::{
self, Command, CompressionType, EntryExt, LogBatch, LogItemContent, OpType, CHECKSUM_LEN,
Expand Down Expand Up @@ -520,15 +521,19 @@ where
let offset = base_offset + offset;
pipe_log.fread(queue, file_num, offset, len)?
}
CompressionType::Lz4 => {
let read_len = batch_len + HEADER_LEN as u64;
c_type @ CompressionType::Lz4 | c_type @ CompressionType::Lz4Blocks => {
let read_len = batch_len + HEADER_LEN as u64 + CHECKSUM_LEN as u64;
let compressed = pipe_log.fread(queue, file_num, base_offset, read_len)?;
let mut reader = compressed.as_ref();
log_batch::test_batch_checksum(compressed.as_slice())?;

let mut reader = compressed.as_slice();
let header = codec::decode_u64(&mut reader)?;
assert_eq!(header >> 8, batch_len);

log_batch::test_batch_checksum(reader)?;
let buf = log_batch::decompress(&reader[..batch_len as usize - CHECKSUM_LEN]);
let buf = match c_type {
CompressionType::Lz4 => decode_block(&reader[..batch_len as usize]),
CompressionType::Lz4Blocks => decode_blocks(&reader[..batch_len as usize]),
_ => unreachable!(),
};
let start = offset as usize - HEADER_LEN;
let end = (offset + len) as usize - HEADER_LEN;
buf[start..end].to_vec()
Expand Down
Loading