Skip to content

Commit

Permalink
Rb trait methods, overwriting mode, bump version
Browse files Browse the repository at this point in the history
  • Loading branch information
agerasev committed Jan 17, 2023
1 parent 3b68572 commit f903b95
Show file tree
Hide file tree
Showing 21 changed files with 435 additions and 100 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ringbuf"
version = "0.3.1"
version = "0.3.2"
authors = ["Alexey Gerasev <alexey.gerasev@gmail.com>"]
edition = "2021"

Expand All @@ -26,6 +26,10 @@ crossbeam-utils = { version = "0.8", default-features = false }
name = "simple"
required-features = ["alloc"]

[[example]]
name = "overwrite"
required-features = ["alloc"]

[[example]]
name = "message"
required-features = ["std"]
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Lock-free SPSC FIFO ring buffer with direct access to inner data.
+ Items can be inserted and removed one by one or many at once.
+ Thread-safe direct access to the internal ring buffer memory.
+ `Read` and `Write` implementation.
+ Overwriting mode support.
+ Can be used without `std` and even without `alloc` (using only statically-allocated memory).
+ [Experimental `async`/`.await` support](https://github.com/agerasev/async-ringbuf).

Expand Down Expand Up @@ -105,6 +106,29 @@ assert_eq!(cons.pop(), None);
# }
```

## Overwrite

Ring buffer can be used in overwriting mode when insertion overwrites the latest element if the buffer is full.

```rust
use ringbuf::{HeapRb, Rb};

# fn main() {
let mut rb = HeapRb::<i32>::new(2);

assert_eq!(rb.push_overwrite(0), None);
assert_eq!(rb.push_overwrite(1), None);
assert_eq!(rb.push_overwrite(2), Some(0));

assert_eq!(rb.pop(), Some(1));
assert_eq!(rb.pop(), Some(2));
assert_eq!(rb.pop(), None);
# }
```

Note that [`push_overwrite`](`Rb::push_overwrite`) requires exclusive access to the ring buffer
so to perform it concurrently you need to guard the ring buffer with [`Mutex`](`std::sync::Mutex`) or some other lock.

## `async`/`.await`

There is an experimental crate [`async-ringbuf`](https://github.com/agerasev/async-ringbuf)
Expand Down
13 changes: 13 additions & 0 deletions examples/overwrite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use ringbuf::{HeapRb, Rb};

fn main() {
let mut rb = HeapRb::<i32>::new(2);

assert_eq!(rb.push_overwrite(0), None);
assert_eq!(rb.push_overwrite(1), None);
assert_eq!(rb.push_overwrite(2), Some(0));

assert_eq!(rb.pop(), Some(1));
assert_eq!(rb.pop(), Some(2));
assert_eq!(rb.pop(), None);
}
2 changes: 1 addition & 1 deletion src/benchmarks/iter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::ring_buffer::HeapRb;
use crate::HeapRb;

use test::{black_box, Bencher};

Expand Down
2 changes: 1 addition & 1 deletion src/benchmarks/slice.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::ring_buffer::HeapRb;
use crate::HeapRb;

use test::{black_box, Bencher};

Expand Down
21 changes: 10 additions & 11 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ where
/// The capacity of the buffer is constant.
#[inline]
pub fn capacity(&self) -> usize {
self.target.capacity().get()
self.target.__capacity().get()
}

/// Checks if the ring buffer is empty.
Expand Down Expand Up @@ -235,34 +235,33 @@ assert_eq!(cons.skip(8), 0);
)]
pub fn skip(&mut self, count: usize) -> usize {
let count = cmp::min(count, self.len());
assert_eq!(unsafe { self.target.skip(Some(count)) }, count);
assert_eq!(unsafe { self.target.__skip(Some(count)) }, count);
count
}

/// Removes all items from the buffer and safely drops them.
///
/// Returns the number of deleted items.
pub fn clear(&mut self) -> usize {
unsafe { self.target.skip(None) }
unsafe { self.target.__skip(None) }
}
}

/// An iterator that removes items from the ring buffer.
pub struct PopIterator<'a, T, R: RbRef>
pub struct PopIterator<'a, T, R: RbRef + ?Sized>
where
R::Rb: RbRead<T>,
{
target: &'a R,
iter: Chain<slice::Iter<'a, MaybeUninit<T>>, slice::Iter<'a, MaybeUninit<T>>>,

len: usize,
}

impl<'a, T, R: RbRef> PopIterator<'a, T, R>
impl<'a, T, R: RbRef + ?Sized> PopIterator<'a, T, R>
where
R::Rb: RbRead<T>,
{
fn new(target: &'a R) -> Self {
pub(crate) fn new(target: &'a R) -> Self {
let slices = unsafe { target.occupied_slices() };
Self {
target,
Expand All @@ -272,7 +271,7 @@ where
}
}

impl<'a, T, R: RbRef> Iterator for PopIterator<'a, T, R>
impl<'a, T, R: RbRef + ?Sized> Iterator for PopIterator<'a, T, R>
where
R::Rb: RbRead<T>,
{
Expand All @@ -287,9 +286,9 @@ where
}
}

impl<'a, T, R: RbRef> ExactSizeIterator for PopIterator<'a, T, R> where R::Rb: RbRead<T> {}
impl<'a, T, R: RbRef + ?Sized> ExactSizeIterator for PopIterator<'a, T, R> where R::Rb: RbRead<T> {}

impl<'a, T, R: RbRef> Drop for PopIterator<'a, T, R>
impl<'a, T, R: RbRef + ?Sized> Drop for PopIterator<'a, T, R>
where
R::Rb: RbRead<T>,
{
Expand All @@ -305,7 +304,7 @@ where
/// Removes first items from the ring buffer and writes them into a slice.
/// Elements must be [`Copy`].
///
/// On success returns count of items been removed from the ring buffer.
/// Returns count of items been removed from the ring buffer.
pub fn pop_slice(&mut self, elems: &mut [T]) -> usize {
let (left, right) = unsafe { self.as_uninit_slices() };
let count = if elems.len() < left.len() {
Expand Down
31 changes: 30 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
//! [`Producer`] is used to insert items to the ring buffer, [`Consumer`] - to remove items from it.
//! For [`SharedRb`] and its derivatives they can be used in different threads.
//!
//! Also you can use the ring buffer without splitting at all via methods provided by [`Rb`] trait.
//!
//! # Types
//!
//! There are several types of ring buffers provided:
Expand Down Expand Up @@ -93,6 +95,33 @@ assert_eq!(cons.pop(), None);
# }
```
"##]
#![cfg_attr(
feature = "std",
doc = r##"
## Overwrite
Ring buffer can be used in overwriting mode when insertion overwrites the latest element if the buffer is full.
```rust
use ringbuf::{HeapRb, Rb};
# fn main() {
let mut rb = HeapRb::<i32>::new(2);
assert_eq!(rb.push_overwrite(0), None);
assert_eq!(rb.push_overwrite(1), None);
assert_eq!(rb.push_overwrite(2), Some(0));
assert_eq!(rb.pop(), Some(1));
assert_eq!(rb.pop(), Some(2));
assert_eq!(rb.pop(), None);
# }
```
Note that [`push_overwrite`](`Rb::push_overwrite`) requires exclusive access to the ring buffer
so to perform it concurrently you need to guard the ring buffer with [`Mutex`](`std::sync::Mutex`) or some other lock.
"##
)]
//! ## `async`/`.await`
//!
//! There is an experimental crate [`async-ringbuf`](https://github.com/agerasev/async-ringbuf)
Expand Down Expand Up @@ -122,7 +151,7 @@ pub use alias::{HeapConsumer, HeapProducer, HeapRb};
pub use alias::{StaticConsumer, StaticProducer, StaticRb};
pub use consumer::Consumer;
pub use producer::Producer;
pub use ring_buffer::{LocalRb, SharedRb};
pub use ring_buffer::{LocalRb, Rb, SharedRb};
pub use transfer::transfer;

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
/// The capacity of the buffer is constant.
#[inline]
pub fn capacity(&self) -> usize {
self.target.capacity().get()
self.target.__capacity().get()
}

/// Checks if the ring buffer is empty.
Expand Down Expand Up @@ -176,7 +176,7 @@ where
R::Rb: RbWrite<T>,
{
/// Appends items from slice to the ring buffer.
/// Elements should be `Copy`.
/// Elements must be [`Copy`].
///
/// Returns count of items been appended to the ring buffer.
pub fn push_slice(&mut self, elems: &[T]) -> usize {
Expand Down
60 changes: 12 additions & 48 deletions src/ring_buffer/base.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
use super::RbWrap;
use core::{
mem::MaybeUninit,
num::NonZeroUsize,
ops::{Deref, Range},
ptr, slice,
};

#[cfg(feature = "alloc")]
use alloc::{rc::Rc, sync::Arc};
use core::{mem::MaybeUninit, num::NonZeroUsize, ops::Range, ptr, slice};

/// Basic ring buffer functionality.
///
Expand Down Expand Up @@ -40,7 +31,7 @@ pub trait RbBase<T> {
/// Capacity of the ring buffer.
///
/// It is constant during the whole ring buffer lifetime.
fn capacity(&self) -> NonZeroUsize;
fn __capacity(&self) -> NonZeroUsize;

/// Head position.
fn head(&self) -> usize;
Expand All @@ -52,20 +43,20 @@ pub trait RbBase<T> {
///
/// Equals to `2 * len`.
#[inline]
fn modulus(&self) -> NonZeroUsize {
unsafe { NonZeroUsize::new_unchecked(2 * self.capacity().get()) }
fn __modulus(&self) -> NonZeroUsize {
unsafe { NonZeroUsize::new_unchecked(2 * self.__capacity().get()) }
}

/// The number of items stored in the buffer at the moment.
fn occupied_len(&self) -> usize {
let modulus = self.modulus();
let modulus = self.__modulus();
(modulus.get() + self.tail() - self.head()) % modulus
}

/// The number of vacant places in the buffer at the moment.
fn vacant_len(&self) -> usize {
let modulus = self.modulus();
(self.capacity().get() + self.head() - self.tail()) % modulus
let modulus = self.__modulus();
(self.__capacity().get() + self.head() - self.tail()) % modulus
}

/// Checks if the occupied range is empty.
Expand Down Expand Up @@ -103,14 +94,14 @@ pub trait RbRead<T>: RbBase<T> {
/// *In debug mode panics if `count` is greater than number of items in the ring buffer.*
unsafe fn advance_head(&self, count: usize) {
debug_assert!(count <= self.occupied_len());
self.set_head((self.head() + count) % self.modulus());
self.set_head((self.head() + count) % self.__modulus());
}

/// Returns a pair of ranges of [`Self::occupied_slices`] location in underlying container.
fn occupied_ranges(&self) -> (Range<usize>, Range<usize>) {
let head = self.head();
let tail = self.tail();
let len = self.capacity();
let len = self.__capacity();

let (head_div, head_mod) = (head / len, head % len);
let (tail_div, tail_mod) = (tail / len, tail % len);
Expand Down Expand Up @@ -155,7 +146,7 @@ pub trait RbRead<T>: RbBase<T> {
/// # Safety
///
/// Must not be called concurrently.
unsafe fn skip(&self, count_or_all: Option<usize>) -> usize {
unsafe fn __skip(&self, count_or_all: Option<usize>) -> usize {
let (left, right) = self.occupied_slices();
let count = match count_or_all {
Some(count) => {
Expand Down Expand Up @@ -196,14 +187,14 @@ pub trait RbWrite<T>: RbBase<T> {
/// *In debug mode panics if `count` is greater than number of vacant places in the ring buffer.*
unsafe fn advance_tail(&self, count: usize) {
debug_assert!(count <= self.vacant_len());
self.set_tail((self.tail() + count) % self.modulus());
self.set_tail((self.tail() + count) % self.__modulus());
}

/// Returns a pair of ranges of [`Self::vacant_slices`] location in underlying container.
fn vacant_ranges(&self) -> (Range<usize>, Range<usize>) {
let head = self.head();
let tail = self.tail();
let len = self.capacity();
let len = self.__capacity();

let (head_div, head_mod) = (head / len, head % len);
let (tail_div, tail_mod) = (tail / len, tail % len);
Expand Down Expand Up @@ -234,30 +225,3 @@ pub trait RbWrite<T>: RbBase<T> {
)
}
}

/// An abstract ring buffer.
///
/// This trait is just a combination of [`RbBase`], [`RbRead`] and [`RbWrite`].
///
/// See [`RbBase`] for details of internal implementation of the ring buffer.
pub trait Rb<T>: RbRead<T> + RbWrite<T> {}

/// An abstract reference to the ring buffer.
pub trait RbRef: Deref<Target = Self::Rb> {
type Rb;
}

impl<B> RbRef for RbWrap<B> {
type Rb = B;
}
impl<'a, B> RbRef for &'a B {
type Rb = B;
}
#[cfg(feature = "alloc")]
impl<B> RbRef for Rc<B> {
type Rb = B;
}
#[cfg(feature = "alloc")]
impl<B> RbRef for Arc<B> {
type Rb = B;
}
8 changes: 4 additions & 4 deletions src/ring_buffer/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ where
}

#[inline]
fn capacity(&self) -> NonZeroUsize {
self.target.capacity()
fn __capacity(&self) -> NonZeroUsize {
self.target.__capacity()
}

#[inline]
Expand All @@ -68,8 +68,8 @@ where
}

#[inline]
fn capacity(&self) -> NonZeroUsize {
self.target.capacity()
fn __capacity(&self) -> NonZeroUsize {
self.target.__capacity()
}

#[inline]
Expand Down
4 changes: 2 additions & 2 deletions src/ring_buffer/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<T, C: Container<T>> RbBase<T> for LocalRb<T, C> {
}

#[inline]
fn capacity(&self) -> NonZeroUsize {
fn __capacity(&self) -> NonZeroUsize {
self.storage.len()
}

Expand Down Expand Up @@ -81,7 +81,7 @@ impl<T, C: Container<T>> Rb<T> for LocalRb<T, C> {}

impl<T, C: Container<T>> Drop for LocalRb<T, C> {
fn drop(&mut self) {
unsafe { self.skip(None) };
self.clear();
}
}

Expand Down
Loading

0 comments on commit f903b95

Please sign in to comment.