Skip to content

Commit

Permalink
Merge pull request #42 from akoshchiy/11270-json-concat
Browse files Browse the repository at this point in the history
feat: add concat & improve strip_nulls
  • Loading branch information
b41sh authored Jan 3, 2024
2 parents 582c139 + 2a68027 commit 5a43132
Show file tree
Hide file tree
Showing 7 changed files with 687 additions and 165 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ harness = false
name = "get_path"
harness = false

[[bench]]
name = "strip_nulls"
harness = false
76 changes: 76 additions & 0 deletions benches/strip_nulls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fs, io::Read};

use criterion::{criterion_group, criterion_main, Criterion};
use jsonb::{from_slice, strip_nulls, Value};

fn read(file: &str) -> Vec<u8> {
let mut f = fs::File::open(file).unwrap();
let mut data = vec![];
f.read_to_end(&mut data).unwrap();
data
}

fn strip_nulls_deser(data: &[u8]) {
let mut buf = Vec::new();
let mut json = from_slice(data).unwrap();
strip_value_nulls(&mut json);
json.write_to_vec(&mut buf);
assert!(!buf.is_empty());
}

fn strip_value_nulls(val: &mut Value<'_>) {
match val {
Value::Array(arr) => {
for v in arr {
strip_value_nulls(v);
}
}
Value::Object(ref mut obj) => {
for (_, v) in obj.iter_mut() {
strip_value_nulls(v);
}
obj.retain(|_, v| !matches!(v, Value::Null));
}
_ => {}
}
}

fn strip_nulls_fast(data: &[u8]) {
let mut buf = Vec::new();
strip_nulls(data, &mut buf).unwrap();
assert!(!buf.is_empty());
}

fn add_benchmark(c: &mut Criterion) {
let paths = fs::read_dir("./data/").unwrap();
for path in paths {
let file = format!("{}", path.unwrap().path().display());
let bytes = read(&file);
let json = from_slice(&bytes).unwrap().to_vec();

c.bench_function(&format!("strip_nulls_deser[{}]", file), |b| {
b.iter(|| strip_nulls_deser(&json));
});

c.bench_function(&format!("strip_nulls_fast[{}]", file), |b| {
b.iter(|| strip_nulls_fast(&json));
});
}
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
149 changes: 149 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2024 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use byteorder::{BigEndian, WriteBytesExt};

use crate::{
constants::{ARRAY_CONTAINER_TAG, OBJECT_CONTAINER_TAG},
jentry::JEntry,
};

enum Entry<'a> {
ArrayBuilder(ArrayBuilder<'a>),
ObjectBuilder(ObjectBuilder<'a>),
Raw(JEntry, &'a [u8]),
}

pub(crate) struct ArrayBuilder<'a> {
entries: Vec<Entry<'a>>,
}

impl<'a> ArrayBuilder<'a> {
pub(crate) fn new(capacity: usize) -> Self {
Self {
entries: Vec::with_capacity(capacity),
}
}

pub(crate) fn push_raw(&mut self, jentry: JEntry, data: &'a [u8]) {
self.entries.push(Entry::Raw(jentry, data));
}

pub(crate) fn push_array(&mut self, builder: ArrayBuilder<'a>) {
self.entries.push(Entry::ArrayBuilder(builder));
}

pub(crate) fn push_object(&mut self, builder: ObjectBuilder<'a>) {
self.entries.push(Entry::ObjectBuilder(builder));
}

pub(crate) fn len(&self) -> usize {
self.entries.len()
}

pub(crate) fn build_into(self, buf: &mut Vec<u8>) {
let header = ARRAY_CONTAINER_TAG | self.entries.len() as u32;
buf.write_u32::<BigEndian>(header).unwrap();

let mut jentry_index = reserve_jentries(buf, self.entries.len() * 4);

for entry in self.entries.into_iter() {
let jentry = write_entry(buf, entry);
replace_jentry(buf, jentry, &mut jentry_index);
}
}
}

pub(crate) struct ObjectBuilder<'a> {
entries: BTreeMap<&'a str, Entry<'a>>,
}

impl<'a> ObjectBuilder<'a> {
pub(crate) fn new() -> Self {
Self {
entries: BTreeMap::new(),
}
}

pub(crate) fn push_raw(&mut self, key: &'a str, jentry: JEntry, data: &'a [u8]) {
self.entries.insert(key, Entry::Raw(jentry, data));
}

pub(crate) fn push_array(&mut self, key: &'a str, builder: ArrayBuilder<'a>) {
self.entries.insert(key, Entry::ArrayBuilder(builder));
}

pub(crate) fn push_object(&mut self, key: &'a str, builder: ObjectBuilder<'a>) {
self.entries.insert(key, Entry::ObjectBuilder(builder));
}

pub(crate) fn len(&self) -> usize {
self.entries.len()
}

pub(crate) fn build_into(self, buf: &mut Vec<u8>) {
let header = OBJECT_CONTAINER_TAG | self.entries.len() as u32;
buf.write_u32::<BigEndian>(header).unwrap();

let mut jentry_index = reserve_jentries(buf, self.entries.len() * 8);

for (key, _) in self.entries.iter() {
let key_len = key.len();
buf.extend_from_slice(key.as_bytes());
let jentry = JEntry::make_string_jentry(key_len);
replace_jentry(buf, jentry, &mut jentry_index)
}

for (_, entry) in self.entries.into_iter() {
let jentry = write_entry(buf, entry);
replace_jentry(buf, jentry, &mut jentry_index);
}
}
}

fn write_entry(buf: &mut Vec<u8>, entry: Entry<'_>) -> JEntry {
match entry {
Entry::ArrayBuilder(builder) => {
let jentry = JEntry::make_container_jentry(builder.len());
builder.build_into(buf);
jentry
}
Entry::ObjectBuilder(builder) => {
let jentry = JEntry::make_container_jentry(builder.len());
builder.build_into(buf);
jentry
}
Entry::Raw(jentry, data) => {
buf.extend_from_slice(data);
jentry
}
}
}

fn reserve_jentries(buf: &mut Vec<u8>, len: usize) -> usize {
let old_len = buf.len();
let new_len = old_len + len;
buf.resize(new_len, 0);
old_len
}

fn replace_jentry(buf: &mut [u8], jentry: JEntry, jentry_index: &mut usize) {
let jentry_bytes = jentry.encoded().to_be_bytes();
for (i, b) in jentry_bytes.iter().enumerate() {
buf[*jentry_index + i] = *b;
}
*jentry_index += 4;
}
Loading

0 comments on commit 5a43132

Please sign in to comment.