Skip to content

Commit

Permalink
add wide columns (rust side)
Browse files Browse the repository at this point in the history
  • Loading branch information
Congyuwang committed Oct 31, 2024
1 parent d61a95a commit 25c77ac
Show file tree
Hide file tree
Showing 9 changed files with 462 additions and 19 deletions.
2 changes: 1 addition & 1 deletion clippy.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
msrv = "1.70.0"
msrv = "1.71.1"
17 changes: 11 additions & 6 deletions librocksdb-sys/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
[package]
name = "librocksdb-sys"
version = "0.17.0+9.0.0"
version = "0.17.0+9.7.3"
edition = "2018"
rust-version = "1.71.1"
authors = ["Karl Hobley <karlhobley10@gmail.com>", "Arkadiy Paronyan <arkadiy@ethcore.io>"]
authors = [
"Karl Hobley <karlhobley10@gmail.com>",
"Arkadiy Paronyan <arkadiy@ethcore.io>",
]
license = "MIT/Apache-2.0/BSD-3-Clause"
description = "Native bindings to librocksdb"
readme = "README.md"
repository = "https://github.com/rust-rocksdb/rust-rocksdb"
keywords = [ "bindings", "ffi", "rocksdb" ]
categories = [ "api-bindings", "database", "external-ffi-bindings" ]
keywords = ["bindings", "ffi", "rocksdb"]
categories = ["api-bindings", "database", "external-ffi-bindings"]
links = "rocksdb"

[features]
default = [ "static" ]
default = ["static"]
jemalloc = ["tikv-jemalloc-sys"]
static = ["libz-sys?/static", "bzip2-sys?/static"]
mt_static = []
Expand All @@ -27,7 +30,9 @@ rtti = []

[dependencies]
libc = "0.2"
tikv-jemalloc-sys = { version = "0.6", features = ["unprefixed_malloc_on_supported_platforms"], optional = true }
tikv-jemalloc-sys = { version = "0.6", features = [
"unprefixed_malloc_on_supported_platforms",
], optional = true }
lz4-sys = { version = "1.10", optional = true }
zstd-sys = { version = "2.0", features = ["zdict_builder"], optional = true }
libz-sys = { version = "1.1", default-features = false, optional = true }
Expand Down
99 changes: 94 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
//

use crate::{
column_family::AsColumnFamilyRef,
column_family::BoundColumnFamily,
column_family::UnboundColumnFamily,
column_family::{AsColumnFamilyRef, BoundColumnFamily, UnboundColumnFamily},
db_options::OptionsMustOutliveDB,
ffi,
ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
WaitForCompactOptions, WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
IngestExternalFileOptions, IteratorMode, Options, PinnableWideColumns, ReadOptions,
SnapshotWithThreadMode, WaitForCompactOptions, WriteBatch, WriteOptions,
DEFAULT_COLUMN_FAMILY_NAME,
};

use crate::ffi_util::CSlice;
Expand Down Expand Up @@ -1078,6 +1077,40 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
}
}

/// Return the value associated with a key using RocksDB's PinnableSlice
/// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
/// allows specifying ColumnFamily
pub fn get_entity_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<PinnableWideColumns>, Error> {
if readopts.inner.is_null() {
return Err(Error::new(
"Unable to create RocksDB read options. This is a fairly trivial call, and its \
failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
.to_owned(),
));
}

let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_get_entity_cf(
self.inner.inner(),
readopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
));
if val.is_null() {
Ok(None)
} else {
Ok(Some(PinnableWideColumns::from_c(val)))
}
}
}

/// Return the value associated with a key using RocksDB's PinnableSlice
/// so as to avoid unnecessary memory copy. Similar to get_pinned_cf_opt but
/// leverages default options.
Expand Down Expand Up @@ -1630,6 +1663,62 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
}
}

pub fn put_entity_cf_opt<'a, 'b, K, V1, V2, I1, I2>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
names: I1,
values: I2,
writeopts: &WriteOptions,
) -> Result<(), Error>
where
K: AsRef<[u8]>,
V1: 'a + AsRef<[u8]>,
V2: 'b + AsRef<[u8]>,
I1: IntoIterator<Item = &'a V1>,
I2: IntoIterator<Item = &'b V2>,
{
let key = key.as_ref();

let (ptr_names, names_sizes): (Vec<_>, Vec<_>) = names
.into_iter()
.map(|k| {
let k = k.as_ref();
(k.as_ptr(), k.len())
})
.unzip();

let (ptr_values, values_sizes): (Vec<_>, Vec<_>) = values
.into_iter()
.map(|k| {
let k = k.as_ref();
(k.as_ptr(), k.len())
})
.unzip();

if ptr_names.len() != ptr_values.len() {
return Err(Error::new(
"columns names and values length mismatch".to_string(),
));
}

unsafe {
ffi_try!(ffi::rocksdb_put_entity_cf(
self.inner.inner(),
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
ptr_names.len(),
ptr_names.as_ptr() as *const *const c_char,
names_sizes.as_ptr(),
ptr_values.as_ptr() as *const *const c_char,
values_sizes.as_ptr(),
));
Ok(())
}
}

pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
where
K: AsRef<[u8]>,
Expand Down
20 changes: 19 additions & 1 deletion src/db_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use crate::{
db::{DBAccess, DB},
ffi, Error, ReadOptions, WriteBatch,
ffi,
wide_columns::WideColumns,
Error, ReadOptions, WriteBatch,
};
use libc::{c_char, c_uchar, size_t};
use std::{marker::PhantomData, slice};
Expand Down Expand Up @@ -332,6 +334,15 @@ impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
}
}

/// Returns pair with slice of the current key and current value.
pub fn columns(&self) -> Option<WideColumns> {
if self.valid() {
Some(self.columns_impl())
} else {
None
}
}

/// Returns a slice of the current key; assumes the iterator is valid.
fn key_impl(&self) -> &[u8] {
// Safety Note: This is safe as all methods that may invalidate the buffer returned
Expand All @@ -355,6 +366,13 @@ impl<'a, D: DBAccess> DBRawIteratorWithThreadMode<'a, D> {
slice::from_raw_parts(val_ptr as *const c_uchar, val_len)
}
}

fn columns_impl(&self) -> WideColumns {
unsafe {
let columns = ffi::rocksdb_iter_columns(self.inner.as_ptr());
WideColumns::from_c(columns)
}
}
}

impl<'a, D: DBAccess> Drop for DBRawIteratorWithThreadMode<'a, D> {
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ mod snapshot;
mod sst_file_writer;
pub mod statistics;
mod transactions;
mod wide_columns;
mod write_batch;

pub use crate::{
Expand Down Expand Up @@ -133,6 +134,7 @@ pub use crate::{
OptimisticTransactionDB, OptimisticTransactionOptions, Transaction, TransactionDB,
TransactionDBOptions, TransactionOptions,
},
wide_columns::{Iterable, PinnableWideColumns},
write_batch::{WriteBatch, WriteBatchIterator, WriteBatchWithTransaction},
};

Expand Down
Loading

0 comments on commit 25c77ac

Please sign in to comment.