Skip to content

Better zero-copy support for keys and values that can be trivially converted into Slice #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod item;

use crate::{Keyspace, PartitionHandle, PersistMode};
use item::Item;
use lsm_tree::{AbstractTree, ValueType};
use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -55,28 +55,20 @@ impl Batch {
}

/// Inserts a key-value pair into the batch
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&mut self,
p: &PartitionHandle,
key: K,
value: V,
) {
self.data.push(Item::new(
p.name.clone(),
key.as_ref(),
value.as_ref(),
ValueType::Value,
));
self.data
.push(Item::new(p.name.clone(), key, value, ValueType::Value));
}

/// Adds a tombstone marker for a key
pub fn remove<K: AsRef<[u8]>>(&mut self, p: &PartitionHandle, key: K) {
self.data.push(Item::new(
p.name.clone(),
key.as_ref(),
vec![],
ValueType::Tombstone,
));
pub fn remove<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
self.data
.push(Item::new(p.name.clone(), key, vec![], ValueType::Tombstone));
}

/// Commits the batch to the [`Keyspace`] atomically
Expand Down
18 changes: 11 additions & 7 deletions src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,11 @@ impl PartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> crate::Result<()> {
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
value: V,
) -> crate::Result<()> {
if self.is_deleted.load(std::sync::atomic::Ordering::Relaxed) {
return Err(crate::Error::PartitionDeleted);
}
Expand All @@ -863,13 +867,13 @@ impl PartitionHandle {
return Err(crate::Error::Poisoned);
}

let key = key.as_ref();
let value = value.as_ref();
let key: UserKey = key.into();
let value: UserValue = value.into();
let seqno = self.seqno.next();

let mut journal_writer = self.journal.get_writer();

journal_writer.write_raw(&self.name, key, value, lsm_tree::ValueType::Value, seqno)?;
journal_writer.write_raw(&self.name, &key, &value, lsm_tree::ValueType::Value, seqno)?;

if !self.config.manual_journal_persist {
journal_writer.flush(crate::PersistMode::Buffer)?;
Expand Down Expand Up @@ -917,7 +921,7 @@ impl PartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<()> {
pub fn remove<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
if self.is_deleted.load(std::sync::atomic::Ordering::Relaxed) {
return Err(crate::Error::PartitionDeleted);
}
Expand All @@ -926,12 +930,12 @@ impl PartitionHandle {
return Err(crate::Error::Poisoned);
}

let key = key.as_ref();
let key: UserKey = key.into();
let seqno = self.seqno.next();

let mut journal_writer = self.journal.get_writer();

journal_writer.write_raw(&self.name, key, &[], lsm_tree::ValueType::Tombstone, seqno)?;
journal_writer.write_raw(&self.name, &key, &[], lsm_tree::ValueType::Tombstone, seqno)?;

if !self.config.manual_journal_persist {
journal_writer.flush(crate::PersistMode::Buffer)?;
Expand Down
10 changes: 5 additions & 5 deletions src/tx/conflict_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ impl ConflictManager {
}
}

pub fn mark_read(&mut self, partition: &PartitionKey, key: &Slice) {
self.push_read(partition, Read::Single(key.clone()));
pub fn mark_read(&mut self, partition: &PartitionKey, key: Slice) {
self.push_read(partition, Read::Single(key));
}

pub fn mark_conflict(&mut self, partition: &PartitionKey, key: &[u8]) {
pub fn mark_conflict(&mut self, partition: &PartitionKey, key: Slice) {
if let Some(tbl) = self.conflict_keys.get_mut(partition) {
tbl.insert(key.into());
tbl.insert(key);
} else {
self.conflict_keys
.entry(partition.clone())
.or_default()
.insert(key.into());
.insert(key);
}
}

Expand Down
28 changes: 18 additions & 10 deletions src/tx/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// (found in the LICENSE-* files in the repository)

use crate::{gc::GarbageCollection, PartitionHandle, TxKeyspace};
use lsm_tree::{gc::Report as GcReport, KvPair, UserValue};
use lsm_tree::{gc::Report as GcReport, KvPair, UserKey, UserValue};
use std::path::PathBuf;

/// Access to a partition of a transactional keyspace
Expand Down Expand Up @@ -63,7 +63,7 @@ impl TransactionalPartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn take<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<UserValue>> {
pub fn take<K: Into<UserKey>>(&self, key: K) -> crate::Result<Option<UserValue>> {
self.fetch_update(key, |_| None)
}

Expand Down Expand Up @@ -121,11 +121,13 @@ impl TransactionalPartitionHandle {
///
/// Will return `Err` if an IO error occurs.
#[allow(unused_mut)]
pub fn fetch_update<K: AsRef<[u8]>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
pub fn fetch_update<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
&self,
key: K,
mut f: F,
) -> crate::Result<Option<UserValue>> {
let key: UserKey = key.into();

#[cfg(feature = "single_writer_tx")]
{
let mut tx = self.keyspace.write_tx();
Expand All @@ -139,7 +141,7 @@ impl TransactionalPartitionHandle {
#[cfg(feature = "ssi_tx")]
loop {
let mut tx = self.keyspace.write_tx()?;
let prev = tx.fetch_update(self, key.as_ref(), &mut f)?;
let prev = tx.fetch_update(self, key.clone(), &mut f)?;
if tx.commit()?.is_ok() {
return Ok(prev);
}
Expand Down Expand Up @@ -200,11 +202,13 @@ impl TransactionalPartitionHandle {
///
/// Will return `Err` if an IO error occurs.
#[allow(unused_mut)]
pub fn update_fetch<K: AsRef<[u8]>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
pub fn update_fetch<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
&self,
key: K,
mut f: F,
) -> crate::Result<Option<UserValue>> {
let key = key.into();

#[cfg(feature = "single_writer_tx")]
{
let mut tx = self.keyspace.write_tx();
Expand All @@ -217,7 +221,7 @@ impl TransactionalPartitionHandle {
#[cfg(feature = "ssi_tx")]
loop {
let mut tx = self.keyspace.write_tx()?;
let updated = tx.update_fetch(self, key.as_ref(), &mut f)?;
let updated = tx.update_fetch(self, key.clone(), &mut f)?;
if tx.commit()?.is_ok() {
return Ok(updated);
}
Expand Down Expand Up @@ -251,7 +255,11 @@ impl TransactionalPartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> crate::Result<()> {
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
value: V,
) -> crate::Result<()> {
#[cfg(feature = "single_writer_tx")]
{
let mut tx = self.keyspace.write_tx();
Expand All @@ -263,7 +271,7 @@ impl TransactionalPartitionHandle {
#[cfg(feature = "ssi_tx")]
{
let mut tx = self.keyspace.write_tx()?;
tx.insert(self, key.as_ref(), value.as_ref());
tx.insert(self, key, value);
tx.commit()?.expect("blind insert should not conflict ever");
Ok(())
}
Expand Down Expand Up @@ -296,7 +304,7 @@ impl TransactionalPartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<()> {
pub fn remove<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
#[cfg(feature = "single_writer_tx")]
{
let mut tx = self.keyspace.write_tx();
Expand All @@ -308,7 +316,7 @@ impl TransactionalPartitionHandle {
#[cfg(feature = "ssi_tx")]
{
let mut tx = self.keyspace.write_tx()?;
tx.remove(self, key.as_ref());
tx.remove(self, key);
tx.commit()?.expect("blind remove should not conflict ever");
Ok(())
}
Expand Down
34 changes: 18 additions & 16 deletions src/tx/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl BaseTransaction {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub(super) fn take<K: AsRef<[u8]>>(
pub(super) fn take<K: Into<UserKey>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand All @@ -75,24 +75,25 @@ impl BaseTransaction {
///
/// Will return `Err` if an IO error occurs.
pub(super) fn update_fetch<
K: AsRef<[u8]>,
K: Into<UserKey>,
F: FnMut(Option<&UserValue>) -> Option<UserValue>,
>(
&mut self,
partition: &TxPartitionHandle,
key: K,
mut f: F,
) -> crate::Result<Option<UserValue>> {
let key = key.into();
let prev = self.get(partition, &key)?;
let updated = f(prev.as_ref());

if let Some(value) = &updated {
if let Some(value) = updated.clone() {
// NOTE: Skip insert if the value hasn't changed
if updated != prev {
self.insert(partition, &key, value);
if prev.as_ref() != Some(&value) {
self.insert(partition, key, value);
}
} else if prev.is_some() {
self.remove(partition, &key);
self.remove(partition, key);
}

Ok(updated)
Expand All @@ -106,24 +107,25 @@ impl BaseTransaction {
///
/// Will return `Err` if an IO error occurs.
pub(super) fn fetch_update<
K: AsRef<[u8]>,
K: Into<UserKey>,
F: FnMut(Option<&UserValue>) -> Option<UserValue>,
>(
&mut self,
partition: &TxPartitionHandle,
key: K,
mut f: F,
) -> crate::Result<Option<UserValue>> {
let key = key.into();
let prev = self.get(partition, &key)?;
let updated = f(prev.as_ref());

if let Some(value) = &updated {
if let Some(value) = updated {
// NOTE: Skip insert if the value hasn't changed
if updated != prev {
self.insert(partition, &key, value);
if prev.as_ref() != Some(&value) {
self.insert(partition, key, value);
}
} else if prev.is_some() {
self.remove(partition, &key);
self.remove(partition, key);
}

Ok(prev)
Expand Down Expand Up @@ -350,7 +352,7 @@ impl BaseTransaction {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub(super) fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(
pub(super) fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand All @@ -361,8 +363,8 @@ impl BaseTransaction {
.entry(partition.inner.name.clone())
.or_default()
.insert(lsm_tree::InternalValue::from_components(
key.as_ref(),
value.as_ref(),
key,
value,
// NOTE: Just take the max seqno, which should never be reached
// that way, the write is definitely always the newest
SeqNo::MAX,
Expand All @@ -378,13 +380,13 @@ impl BaseTransaction {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub(super) fn remove<K: AsRef<[u8]>>(&mut self, partition: &TxPartitionHandle, key: K) {
pub(super) fn remove<K: Into<UserKey>>(&mut self, partition: &TxPartitionHandle, key: K) {
// TODO: PERF: slow??
self.memtables
.entry(partition.inner.name.clone())
.or_default()
.insert(lsm_tree::InternalValue::new_tombstone(
key.as_ref(),
key,
// NOTE: Just take the max seqno, which should never be reached
// that way, the write is definitely always the newest
SeqNo::MAX,
Expand Down
10 changes: 5 additions & 5 deletions src/tx/write/single_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<'a> WriteTransaction<'a> {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn take<K: AsRef<[u8]>>(
pub fn take<K: Into<UserKey>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand Down Expand Up @@ -116,7 +116,7 @@ impl<'a> WriteTransaction<'a> {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn update_fetch<K: AsRef<[u8]>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
pub fn update_fetch<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand Down Expand Up @@ -175,7 +175,7 @@ impl<'a> WriteTransaction<'a> {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn fetch_update<K: AsRef<[u8]>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
pub fn fetch_update<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand Down Expand Up @@ -555,7 +555,7 @@ impl<'a> WriteTransaction<'a> {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand Down Expand Up @@ -598,7 +598,7 @@ impl<'a> WriteTransaction<'a> {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn remove<K: AsRef<[u8]>>(&mut self, partition: &TxPartitionHandle, key: K) {
pub fn remove<K: Into<UserKey>>(&mut self, partition: &TxPartitionHandle, key: K) {
self.inner.remove(partition, key);
}

Expand Down
Loading
Loading