Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file modified core/bench/src/actors/producer/benchmark_producer.rs
100644 → 100755
Empty file.
1 change: 1 addition & 0 deletions core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ nix = { workspace = true }

[dev-dependencies]
serial_test = { workspace = true }
tempfile = { workspace = true }
165 changes: 161 additions & 4 deletions core/common/src/alloc/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use compio::buf::{IoBuf, IoBufMut, SetLen};
use std::{
mem::MaybeUninit,
ops::{Deref, DerefMut},
sync::Arc,
};

/// A buffer wrapper that participates in memory pooling.
Expand Down Expand Up @@ -265,10 +266,9 @@ impl PooledBuffer {
/// After calling this method, the PooledBuffer becomes empty and will not
/// return memory to the pool on drop (the frozen Bytes owns the allocation).
/// The returned `Bytes` is Arc-backed, allowing cheap clones.
pub fn freeze(&mut self) -> Bytes {
pub fn freeze_to_bytes(&mut self) -> Bytes {
let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT));

// Update pool accounting
if self.from_pool
&& let Some(bucket_idx) = self.original_bucket_idx
{
Expand All @@ -278,10 +278,37 @@ impl PooledBuffer {
self.original_capacity = 0;
self.original_bucket_idx = None;

// Zero copy: Bytes takes ownership of the AlignedBuffer
// and will drop it when refcount reaches zero
Bytes::from_owner(buf)
}

pub fn freeze(&mut self) -> FrozenPooledBuffer {
let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT));
let len = buf.len();

// Transfer pool metadata to frozen buffer
let pool_meta = if self.from_pool {
Some(PoolMeta {
original_capacity: self.original_capacity,
original_bucket_idx: self.original_bucket_idx,
})
} else {
None
};

// Reset self, pool accounting now lives in FrozenPooledBuffer
self.from_pool = false;
self.original_capacity = 0;
self.original_bucket_idx = None;

FrozenPooledBuffer {
inner: Arc::new(FrozenInner {
buffer: buf,
pool_meta,
}),
offset: 0,
len,
}
}
}

impl Deref for PooledBuffer {
Expand Down Expand Up @@ -346,3 +373,133 @@ impl IoBufMut for PooledBuffer {
unsafe { std::slice::from_raw_parts_mut(ptr, cap) }
}
}

#[derive(Debug, Clone)]
struct PoolMeta {
original_capacity: usize,
original_bucket_idx: Option<usize>,
}

#[derive(Debug)]
struct FrozenInner {
buffer: AlignedBuffer,
pool_meta: Option<PoolMeta>,
}

impl Drop for FrozenInner {
fn drop(&mut self) {
if let Some(ref meta) = self.pool_meta {
let buf = std::mem::replace(&mut self.buffer, AlignedBuffer::new(ALIGNMENT));
buf.return_to_pool(meta.original_capacity, true);
}
}
}

#[derive(Clone, Debug)]
pub struct FrozenPooledBuffer {
inner: Arc<FrozenInner>,
offset: usize,
len: usize,
}

impl FrozenPooledBuffer {
/// Try to reclaim the underlying `PooledBuffer` without copying.
///
/// Success when:
/// 1. This is the sole `Arc` reference (refcount == 1)
/// 2. This view covers the entire buffer (not a sub-slice)
///
///
/// On success, pool accounting is transferred back to the returned `PooledBuffer`.
/// On failure, `self` is return unchanged
pub fn thaw(self) -> Result<PooledBuffer, FrozenPooledBuffer> {
// Sub-slice views can't reclaim the whole buffer
if self.offset != 0 || self.len != self.inner.buffer.len() {
return Err(self);
}

match Arc::try_unwrap(self.inner) {
Ok(mut frozen_inner) => {
let buffer =
std::mem::replace(&mut frozen_inner.buffer, AlignedBuffer::new(ALIGNMENT));

// Extract pool metadata and prevent FrozenInner::drop from returning the buffer to
// the pool -> we are taking ownership
let pool_meta = frozen_inner.pool_meta.take();
let (from_pool, original_capacity, original_bucket_idx) = match pool_meta {
Some(meta) => (true, meta.original_capacity, meta.original_bucket_idx),
None => (false, buffer.capacity(), None),
};

Ok(PooledBuffer {
from_pool,
original_capacity,
original_bucket_idx,
inner: buffer,
})
}
Err(arc) => Err(FrozenPooledBuffer {
inner: arc,
offset: self.offset,
len: self.len,
}),
}
}

/// Create a subslice view. Try to be cheap.
/// Panics if the range is out of bounds
pub fn slice(&self, range: std::ops::Range<usize>) -> FrozenPooledBuffer {
assert!(
range.end <= self.len,
"slice out of bounds: {}..{} but len is {}",
range.start,
range.end,
self.len
);

FrozenPooledBuffer {
inner: Arc::clone(&self.inner),
offset: self.offset + range.start,
len: range.end - range.start,
}
}

pub fn len(&self) -> usize {
self.len
}

pub fn is_empty(&self) -> bool {
self.len == 0
}

pub fn is_aligned(&self) -> bool {
(self.as_ref().as_ptr() as usize).is_multiple_of(ALIGNMENT)
}
}

impl AsRef<[u8]> for FrozenPooledBuffer {
fn as_ref(&self) -> &[u8] {
&self.inner.buffer[self.offset..self.offset + self.len]
}
}

impl Deref for FrozenPooledBuffer {
type Target = [u8];

fn deref(&self) -> &Self::Target {
self.as_ref()
}
}

impl PartialEq for FrozenPooledBuffer {
fn eq(&self, other: &Self) -> bool {
self.as_ref() == other.as_ref()
}
}

/// Allow passing FrozenPooledBuffer directly to DirectFile's write methods without any copy
impl IoBuf for FrozenPooledBuffer {
fn as_init(&self) -> &[u8] {
self.as_ref()
}
}
1 change: 1 addition & 0 deletions core/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod types;
mod utils;
pub mod wire_conversions;

pub use alloc::memory_pool::ALIGNMENT;
pub use error::client_error::ClientError;
pub use error::iggy_error::{IggyError, IggyErrorDiscriminants};
// Locking is feature gated, thus only mod level re-export.
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/types/message/indexes_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl IggyIndexesMut {
/// The returned `IggyIndexes` uses Arc-backed `Bytes`, allowing cheap clones.
pub fn freeze(&mut self) -> IggyIndexes {
let base_position = self.base_position;
let buffer = self.buffer.freeze();
let buffer = self.buffer.freeze_to_bytes();
self.saved_count = 0;
self.base_position = 0;
IggyIndexes::new(buffer, base_position)
Expand Down
19 changes: 18 additions & 1 deletion core/common/src/types/message/message_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,25 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> {

#[cfg(test)]
mod tests {
use std::{str::FromStr, sync::Once};

use super::*;
use crate::IggyMessage;
use crate::{IggyMessage, MemoryPool, MemoryPoolConfigOther};
use bytes::Bytes;

static TEST_INIT: Once = Once::new();

fn initialize_pool_for_tests() {
TEST_INIT.call_once(|| {
let config = MemoryPoolConfigOther {
enabled: true,
size: IggyByteSize::from_str("4GiB").unwrap(),
bucket_capacity: 8192,
};
MemoryPool::init_pool(&config);
});
}

fn build_batch() -> crate::IggyMessagesBatch {
let messages = vec![
IggyMessage::builder()
Expand All @@ -229,6 +244,7 @@ mod tests {

#[test]
fn should_return_tail_for_indexed_last_after_next() {
initialize_pool_for_tests();
let batch = build_batch();
let mut iter = IggyMessageViewIterator::new_with_boundaries(
batch.buffer(),
Expand All @@ -246,6 +262,7 @@ mod tests {

#[test]
fn should_return_last_message_for_raw_last() {
initialize_pool_for_tests();
let batch = build_batch();
let last = IggyMessageViewIterator::new(batch.buffer()).last().unwrap();
assert_eq!(last.payload(), b"three");
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/types/message/messages_batch_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl IggyMessagesBatchMut {
pub fn freeze(&mut self) -> IggyMessagesBatch {
let count = self.count();
let indexes = self.indexes.freeze();
let messages = self.messages.freeze();
let messages = self.messages.freeze_to_bytes();
IggyMessagesBatch::new(indexes, messages, count)
}

Expand Down
Loading
Loading