Skip to content

use async fn in traits in SendStream #235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
members = [
"h3",
"h3-quinn",
"h3-webtransport",
# "h3-webtransport",

# Internal
"examples",
Expand Down
6 changes: 3 additions & 3 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ path = "client.rs"
name = "server"
path = "server.rs"

[[example]]
name = "webtransport_server"
path = "webtransport_server.rs"
# [[example]]
# name = "webtransport_server"
# path = "webtransport_server.rs"
2 changes: 1 addition & 1 deletion examples/webtransport_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ where
+ h3::quic::Connection<Bytes>
+ RecvDatagramExt<Buf = Bytes>
+ SendDatagramExt<Bytes>,
<C::SendStream as h3::quic::SendStream<Bytes>>::Error:
<C::SendStream as h3::quic::SendStreamLocal<Bytes>>::Error:
'static + std::error::Error + Send + Sync + Into<std::io::Error>,
<C::RecvStream as h3::quic::RecvStream>::Error:
'static + std::error::Error + Send + Sync + Into<std::io::Error>,
Expand Down
99 changes: 28 additions & 71 deletions h3-quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
convert::TryInto,
fmt::{self, Display},
future::Future,
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{self, Poll},
Expand Down Expand Up @@ -150,7 +151,7 @@

impl<B> quic::Connection<B> for Connection
where
B: Buf,
B: Buf + Send,
{
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
Expand Down Expand Up @@ -277,7 +278,7 @@

impl<B> quic::OpenStreams<B> for OpenStreams
where
B: Buf,
B: Buf + Send,
{
type RecvStream = RecvStream;
type SendStream = SendStream<B>;
Expand Down Expand Up @@ -348,7 +349,7 @@

impl<B> quic::BidiStream<B> for BidiStream<B>
where
B: Buf,
B: Buf + Send,
{
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
Expand Down Expand Up @@ -380,14 +381,10 @@

impl<B> quic::SendStream<B> for BidiStream<B>
where
B: Buf,
B: Buf + Send,
{
type Error = SendStreamError;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.send.poll_ready(cx)
}

fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.send.poll_finish(cx)
}
Expand All @@ -396,17 +393,17 @@
self.send.reset(reset_code)
}

fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
self.send.send_data(data)
}

fn send_id(&self) -> StreamId {
self.send.send_id()
}

async fn send_data<T: Into<WriteBuf<B>> + Send>(&mut self, data: T) -> Result<(), Self::Error> {
self.send.send_data(data).await
}
}
impl<B> quic::SendStreamUnframed<B> for BidiStream<B>
where
B: Buf,
B: Buf + Send,
{
fn poll_send<D: Buf>(
&mut self,
Expand Down Expand Up @@ -541,12 +538,11 @@
///
/// Implements a [`quic::SendStream`] backed by a [`quinn::SendStream`].
pub struct SendStream<B: Buf> {
stream: Option<quinn::SendStream>,
writing: Option<WriteBuf<B>>,
write_fut: WriteFuture,
stream: quinn::SendStream,
buf: PhantomData<B>,
}

type WriteFuture =

Check warning on line 545 in h3-quinn/src/lib.rs

View workflow job for this annotation

GitHub Actions / Test stable ubuntu-latest

type alias `WriteFuture` is never used
ReusableBoxFuture<'static, (quinn::SendStream, Result<usize, quinn::WriteError>)>;

impl<B> SendStream<B>
Expand All @@ -555,94 +551,55 @@
{
fn new(stream: quinn::SendStream) -> SendStream<B> {
Self {
stream: Some(stream),
writing: None,
write_fut: ReusableBoxFuture::new(async { unreachable!() }),
stream: stream,

Check warning on line 554 in h3-quinn/src/lib.rs

View workflow job for this annotation

GitHub Actions / Lint

redundant field names in struct initialization
buf: PhantomData,
}
}
}

impl<B> quic::SendStream<B> for SendStream<B>
where
B: Buf,
B: Buf + Send,
{
type Error = SendStreamError;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(ref mut data) = self.writing {
while data.has_remaining() {
if let Some(mut stream) = self.stream.take() {
let chunk = data.chunk().to_owned(); // FIXME - avoid copy
self.write_fut.set(async move {
let ret = stream.write(&chunk).await;
(stream, ret)
});
}

let (stream, res) = ready!(self.write_fut.poll(cx));
self.stream = Some(stream);
match res {
Ok(cnt) => data.advance(cnt),
Err(err) => {
return Poll::Ready(Err(SendStreamError::Write(err)));
}
}
}
async fn send_data<T: Into<WriteBuf<B>> + Send>(&mut self, data: T) -> Result<(), Self::Error> {
let mut data = data.into();
while data.has_remaining() {
let chunk = data.chunk();
match self.stream.write(&chunk).await {
Ok(cnt) => data.advance(cnt),
Err(err) => return Err(SendStreamError::Write(err)),
};
}
self.writing = None;
Poll::Ready(Ok(()))
Ok(())
}

fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.stream
.as_mut()
.unwrap()
.poll_finish(cx)
.map_err(Into::into)
self.stream.poll_finish(cx).map_err(Into::into)
}

fn reset(&mut self, reset_code: u64) {
let _ = self
.stream
.as_mut()
.unwrap()
.reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX));
}

fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
if self.writing.is_some() {
return Err(Self::Error::NotReady);
}
self.writing = Some(data.into());
Ok(())
}

fn send_id(&self) -> StreamId {
self.stream
.as_ref()
.unwrap()
.id()
.0
.try_into()
.expect("invalid stream id")
self.stream.id().0.try_into().expect("invalid stream id")
}
}

impl<B> quic::SendStreamUnframed<B> for SendStream<B>
where
B: Buf,
B: Buf + Send,
{
fn poll_send<D: Buf>(
&mut self,
cx: &mut task::Context<'_>,
buf: &mut D,
) -> Poll<Result<usize, Self::Error>> {
if self.writing.is_some() {
// This signifies a bug in implementation
panic!("poll_send called while send stream is not ready")
}

let s = Pin::new(self.stream.as_mut().unwrap());
let s = Pin::new(&mut self.stream);

let res = ready!(futures::io::AsyncWrite::poll_write(s, cx, buf.chunk()));
match res {
Expand Down
10 changes: 5 additions & 5 deletions h3-webtransport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
//! WebTransport over HTTP/3: <https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/>
#![deny(missing_docs)]

/// Server side WebTransport session support
pub mod server;
/// Webtransport stream types
pub mod stream;
// Server side WebTransport session support
//pub mod server;
// Webtransport stream types
//pub mod stream;

pub use h3::webtransport::SessionId;
//pub use h3::webtransport::SessionId;
16 changes: 8 additions & 8 deletions h3-webtransport/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::stream::{BidiStream, RecvStream, SendStream};
pub struct WebTransportSession<C, B>
where
C: quic::Connection<B>,
B: Buf,
B: Buf + Send,
{
// See: https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-2-3
session_id: SessionId,
Expand All @@ -52,7 +52,7 @@ where
impl<C, B> WebTransportSession<C, B>
where
C: quic::Connection<B>,
B: Buf,
B: Buf + Send,
{
/// Accepts a *CONNECT* request for establishing a WebTransport session.
///
Expand Down Expand Up @@ -273,7 +273,7 @@ pin_project! {
impl<'a, B, C> Future for OpenBi<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
B: Buf + Send,
C::BidiStream: SendStreamUnframed<B>,
{
type Output = Result<BidiStream<C::BidiStream, B>, Error>;
Expand Down Expand Up @@ -317,7 +317,7 @@ pin_project! {
impl<'a, C, B> Future for OpenUni<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
B: Buf + Send,
C::SendStream: SendStreamUnframed<B>,
{
type Output = Result<SendStream<C::SendStream, B>, Error>;
Expand Down Expand Up @@ -364,7 +364,7 @@ pub enum AcceptedBi<C: quic::Connection<B>, B: Buf> {
pub struct ReadDatagram<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
B: Buf + Send,
{
conn: &'a Mutex<Connection<C, B>>,
_marker: PhantomData<B>,
Expand All @@ -373,7 +373,7 @@ where
impl<'a, C, B> Future for ReadDatagram<'a, C, B>
where
C: quic::Connection<B> + RecvDatagramExt,
B: Buf,
B: Buf + Send,
{
type Output = Result<Option<(SessionId, C::Buf)>, Error>;

Expand All @@ -396,15 +396,15 @@ where
pub struct AcceptUni<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
B: Buf + Send,
{
conn: &'a Mutex<Connection<C, B>>,
}

impl<'a, C, B> Future for AcceptUni<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
B: Buf + Send,
{
type Output = Result<Option<(SessionId, RecvStream<C::RecvStream, B>)>, Error>;

Expand Down
38 changes: 18 additions & 20 deletions h3-webtransport/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ impl<S, B> SendStream<S, B> {

impl<S, B> quic::SendStreamUnframed<B> for SendStream<S, B>
where
S: quic::SendStreamUnframed<B>,
B: Buf,
S: quic::SendStreamUnframed<B> + quic::SendStream<B>,
B: Buf + Send,
{
fn poll_send<D: Buf>(
&mut self,
Expand All @@ -113,7 +113,7 @@ where
impl<S, B> quic::SendStream<B> for SendStream<S, B>
where
S: quic::SendStream<B>,
B: Buf,
B: Buf + Send,
{
type Error = S::Error;

Expand All @@ -129,12 +129,11 @@ where
self.stream.send_id()
}

fn send_data<T: Into<h3::stream::WriteBuf<B>>>(&mut self, data: T) -> Result<(), Self::Error> {
self.stream.send_data(data)
}

fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.stream.poll_ready(cx)
async fn send_data<T: Into<h3::stream::WriteBuf<B>> + Send>(
&mut self,
data: T,
) -> Result<(), Self::Error> {
self.stream.send_data(data).await
}
}

Expand Down Expand Up @@ -218,7 +217,7 @@ impl<S, B> BidiStream<S, B> {
impl<S, B> quic::SendStream<B> for BidiStream<S, B>
where
S: quic::SendStream<B>,
B: Buf,
B: Buf + Send,
{
type Error = S::Error;

Expand All @@ -234,19 +233,18 @@ where
self.stream.send_id()
}

fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.stream.poll_ready(cx)
}

fn send_data<T: Into<h3::stream::WriteBuf<B>>>(&mut self, data: T) -> Result<(), Self::Error> {
self.stream.send_data(data)
async fn send_data<T: Into<h3::stream::WriteBuf<B>> + Send>(
&mut self,
data: T,
) -> Result<(), Self::Error> {
self.stream.send_data(data).await
}
}

impl<S, B> quic::SendStreamUnframed<B> for BidiStream<S, B>
where
S: quic::SendStreamUnframed<B>,
B: Buf,
S: quic::SendStreamUnframed<B> + quic::SendStream<B>,
B: Buf + Send,
{
fn poll_send<D: Buf>(
&mut self,
Expand Down Expand Up @@ -280,8 +278,8 @@ impl<S: quic::RecvStream, B> quic::RecvStream for BidiStream<S, B> {

impl<S, B> quic::BidiStream<B> for BidiStream<S, B>
where
S: quic::BidiStream<B>,
B: Buf,
S: quic::BidiStream<B> + quic::SendStream<B>,
B: Buf + Send,
{
type SendStream = SendStream<S::SendStream, B>;

Expand Down
Loading
Loading