Skip to content

Commit

Permalink
it’s always good if you commit the changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sgwilym committed Nov 11, 2024
1 parent 690ea4e commit 12b59b1
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions wgps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
willow-data-model = { path = "../data-model", version = "0.1.0" }
ufotofu = { version = "0.4.2", features = ["std"] }
ufotofu_queues = { version = "0.5.0", features = ["std"]}
either = "1.10.0"

[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions wgps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use willow_data_model::{
ResumptionFailedError, Store, StoreEvent, SubspaceId,
};

mod spsc_channel;

mod ready_transport;
pub use ready_transport::*;

Expand Down
151 changes: 151 additions & 0 deletions wgps/src/spsc_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use either::Either;
use std::{
cell::RefCell,
convert::Infallible,
future::{ready, Future},
rc::Rc,
task::{Poll, Waker},
};
use ufotofu::local_nb::{BufferedProducer, BulkProducer, Producer};
use ufotofu_queues::Queue;

struct SpscState<Q: Queue> {
queue: Q,
waker: Option<Waker>,
}

impl<Q: Queue> SpscState<Q> {}

This comment has been minimized.

Copy link
@AljoschaMeyer

AljoschaMeyer Nov 11, 2024

Contributor

Can delete this.


struct Output<Q: Queue> {
state: Rc<RefCell<SpscState<Q>>>,
}

struct ProduceFuture<'o, Q: Queue> {
output: &'o Output<Q>,
}

impl<'o, Q: Queue> Future for ProduceFuture<'o, Q> {
type Output = Result<Either<Q::Item, Infallible>, Infallible>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut state_ref = self.output.state.borrow_mut();

match state_ref.queue.dequeue() {
Some(item) => Poll::Ready(Ok(Either::Left(item))),
None => {
state_ref.waker = Some(cx.waker().clone());

Poll::Pending
}
}
}
}

impl<Q: Queue> Producer for Output<Q> {
type Item = Q::Item;

type Final = Infallible;

type Error = Infallible;

fn produce(
&mut self,
) -> impl Future<Output = Result<Either<Self::Item, Self::Final>, Self::Error>> {
ProduceFuture { output: self }
}
}

impl<Q: Queue> BufferedProducer for Output<Q> {
fn slurp(&mut self) -> impl Future<Output = Result<(), Self::Error>> {
ready(Ok(()))
}
}

// This is safe if and only if the object pointed at by `reference` lives for at least `'longer`.
// See https://doc.rust-lang.org/nightly/std/intrinsics/fn.transmute.html for more detail.
unsafe fn extend_lifetime<'shorter, 'longer, T: ?Sized>(reference: &'shorter T) -> &'longer T {
std::mem::transmute::<&'shorter T, &'longer T>(reference)
}

struct ExposeItemsFuture<'o, Q: Queue> {
output: &'o Output<Q>,
}

impl<'o, Q: Queue> Future for ExposeItemsFuture<'o, Q> {
type Output = Result<Either<&'o [Q::Item], Infallible>, Infallible>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut state_ref = self.output.state.borrow_mut();

match state_ref.queue.expose_items() {
Some(items) => {
// You're probably wondering how I got here.
// We need to return something which lives for 'o,
// but we're borrowing it from this state_ref thing
// which only lives as long as this function does.
//
// We *know* that these items will have a long enough lifetime,
// because they sit inside a Rc which has a lifetime of 'o.
// The whole point of the Rc is to keep its contents alive as long at least as itself.
// Thus we know that the items have a lifetime of at least 'o.
let items: &'o [Q::Item] = unsafe { extend_lifetime(items) };

Poll::Ready(Ok(Either::Left(items)))
}
None => {
state_ref.waker = Some(cx.waker().clone());

Poll::Pending
}
}
}
}

impl<Q: Queue> BulkProducer for Output<Q> {
fn expose_items<'a>(
&'a mut self,
) -> impl Future<Output = Result<Either<&'a [Self::Item], Self::Final>, Self::Error>>
where
Self::Item: 'a,
{
ExposeItemsFuture { output: self }
}

fn consider_produced(
&mut self,
amount: usize,
) -> impl Future<Output = Result<(), Self::Error>> {
let mut state_ref = self.state.borrow_mut();

state_ref.queue.consider_dequeued(amount);

ready(Ok(()))
}
}

struct Input<Q: Queue> {
state: Rc<RefCell<SpscState<Q>>>,
}

impl<Q: Queue> Input<Q> {
fn consume(self, item: Q::Item) -> Option<Q::Item> {

This comment has been minimized.

Copy link
@AljoschaMeyer

AljoschaMeyer Nov 11, 2024

Contributor

Should take &mut self, not self.

let mut state_ref = self.state.borrow_mut();

match state_ref.queue.enqueue(item) {
Some(item) => Some(item),
None => {
if let Some(waker) = state_ref.waker.take() {
waker.wake_by_ref()

This comment has been minimized.

Copy link
@AljoschaMeyer

AljoschaMeyer Nov 11, 2024

Contributor

Use waker.wake() instead.

};

None
}
}
}
}

0 comments on commit 12b59b1

Please sign in to comment.