From 12b59b1921ad562b86a5fb023a0f88979c26d96c Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Mon, 11 Nov 2024 11:54:51 +0100 Subject: [PATCH] =?UTF-8?q?it=E2=80=99s=20always=20good=20if=20you=20commi?= =?UTF-8?q?t=20the=20changes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 1 + wgps/Cargo.toml | 1 + wgps/src/lib.rs | 2 + wgps/src/spsc_channel.rs | 151 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 155 insertions(+) create mode 100644 wgps/src/spsc_channel.rs diff --git a/Cargo.lock b/Cargo.lock index 004cac3..3f04c0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -534,6 +534,7 @@ dependencies = [ "either", "smol", "ufotofu", + "ufotofu_queues", "willow-data-model", ] diff --git a/wgps/Cargo.toml b/wgps/Cargo.toml index 068365c..d086686 100644 --- a/wgps/Cargo.toml +++ b/wgps/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] willow-data-model = { path = "../data-model", version = "0.1.0" } ufotofu = { version = "0.4.2", features = ["std"] } +ufotofu_queues = { version = "0.5.0", features = ["std"]} either = "1.10.0" [dev-dependencies] diff --git a/wgps/src/lib.rs b/wgps/src/lib.rs index c66bbff..510bf75 100644 --- a/wgps/src/lib.rs +++ b/wgps/src/lib.rs @@ -7,6 +7,8 @@ use willow_data_model::{ ResumptionFailedError, Store, StoreEvent, SubspaceId, }; +mod spsc_channel; + mod ready_transport; pub use ready_transport::*; diff --git a/wgps/src/spsc_channel.rs b/wgps/src/spsc_channel.rs new file mode 100644 index 0000000..79db35d --- /dev/null +++ b/wgps/src/spsc_channel.rs @@ -0,0 +1,151 @@ +use either::Either; +use std::{ + cell::RefCell, + convert::Infallible, + future::{ready, Future}, + rc::Rc, + task::{Poll, Waker}, +}; +use ufotofu::local_nb::{BufferedProducer, BulkProducer, Producer}; +use ufotofu_queues::Queue; + +struct SpscState { + queue: Q, + waker: Option, +} + +impl SpscState {} + +struct Output { + state: Rc>>, +} + +struct ProduceFuture<'o, Q: Queue> { + output: &'o Output, +} + +impl<'o, Q: Queue> Future for ProduceFuture<'o, Q> { + type Output = Result, Infallible>; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut state_ref = self.output.state.borrow_mut(); + + match state_ref.queue.dequeue() { + Some(item) => Poll::Ready(Ok(Either::Left(item))), + None => { + state_ref.waker = Some(cx.waker().clone()); + + Poll::Pending + } + } + } +} + +impl Producer for Output { + type Item = Q::Item; + + type Final = Infallible; + + type Error = Infallible; + + fn produce( + &mut self, + ) -> impl Future, Self::Error>> { + ProduceFuture { output: self } + } +} + +impl BufferedProducer for Output { + fn slurp(&mut self) -> impl Future> { + ready(Ok(())) + } +} + +// This is safe if and only if the object pointed at by `reference` lives for at least `'longer`. +// See https://doc.rust-lang.org/nightly/std/intrinsics/fn.transmute.html for more detail. +unsafe fn extend_lifetime<'shorter, 'longer, T: ?Sized>(reference: &'shorter T) -> &'longer T { + std::mem::transmute::<&'shorter T, &'longer T>(reference) +} + +struct ExposeItemsFuture<'o, Q: Queue> { + output: &'o Output, +} + +impl<'o, Q: Queue> Future for ExposeItemsFuture<'o, Q> { + type Output = Result, Infallible>; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut state_ref = self.output.state.borrow_mut(); + + match state_ref.queue.expose_items() { + Some(items) => { + // You're probably wondering how I got here. + // We need to return something which lives for 'o, + // but we're borrowing it from this state_ref thing + // which only lives as long as this function does. + // + // We *know* that these items will have a long enough lifetime, + // because they sit inside a Rc which has a lifetime of 'o. + // The whole point of the Rc is to keep its contents alive as long at least as itself. + // Thus we know that the items have a lifetime of at least 'o. + let items: &'o [Q::Item] = unsafe { extend_lifetime(items) }; + + Poll::Ready(Ok(Either::Left(items))) + } + None => { + state_ref.waker = Some(cx.waker().clone()); + + Poll::Pending + } + } + } +} + +impl BulkProducer for Output { + fn expose_items<'a>( + &'a mut self, + ) -> impl Future, Self::Error>> + where + Self::Item: 'a, + { + ExposeItemsFuture { output: self } + } + + fn consider_produced( + &mut self, + amount: usize, + ) -> impl Future> { + let mut state_ref = self.state.borrow_mut(); + + state_ref.queue.consider_dequeued(amount); + + ready(Ok(())) + } +} + +struct Input { + state: Rc>>, +} + +impl Input { + fn consume(self, item: Q::Item) -> Option { + let mut state_ref = self.state.borrow_mut(); + + match state_ref.queue.enqueue(item) { + Some(item) => Some(item), + None => { + if let Some(waker) = state_ref.waker.take() { + waker.wake_by_ref() + }; + + None + } + } + } +}