Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move generic Buffer parameter to fn #94

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{net::SocketAddr, path::PathBuf, sync::Arc};

use bytes::{Bytes, BytesMut};
use bytes::BytesMut;
use futures::StreamExt;
use http::{Request, StatusCode};
use rustls::{Certificate, PrivateKey};
Expand Down Expand Up @@ -114,11 +114,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

async fn handle_request<T>(
req: Request<()>,
mut stream: RequestStream<T, Bytes>,
mut stream: RequestStream<T>,
serve_root: Arc<Option<PathBuf>>,
) -> Result<(), Box<dyn std::error::Error>>
where
T: BidiStream<Bytes>,
T: BidiStream,
{
let (status, to_serve) = match serve_root.as_deref() {
None => (StatusCode::OK, None),
Expand Down
129 changes: 46 additions & 83 deletions h3-quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use quinn::{
OpenUni, VarInt, WriteError,
};

use h3::quic::{self, Error, StreamId, WriteBuf};
use h3::quic::{self, Error, StreamId};

pub struct Connection {
conn: quinn::Connection,
Expand Down Expand Up @@ -82,13 +82,10 @@ impl From<quinn::ConnectionError> for ConnectionError {
}
}

impl<B> quic::Connection<B> for Connection
where
B: Buf,
{
type SendStream = SendStream<B>;
impl quic::Connection for Connection {
type SendStream = SendStream;
type RecvStream = RecvStream;
type BidiStream = BidiStream<B>;
type BidiStream = BidiStream;
type OpenStreams = OpenStreams;
type Error = ConnectionError;

Expand Down Expand Up @@ -166,13 +163,10 @@ pub struct OpenStreams {
opening_uni: Option<OpenUni>,
}

impl<B> quic::OpenStreams<B> for OpenStreams
where
B: Buf,
{
impl quic::OpenStreams for OpenStreams {
type RecvStream = RecvStream;
type SendStream = SendStream<B>;
type BidiStream = BidiStream<B>;
type SendStream = SendStream;
type BidiStream = BidiStream;
type Error = ConnectionError;

fn poll_open_bidi(
Expand Down Expand Up @@ -220,30 +214,21 @@ impl Clone for OpenStreams {
}
}

pub struct BidiStream<B>
where
B: Buf,
{
send: SendStream<B>,
pub struct BidiStream {
send: SendStream,
recv: RecvStream,
}

impl<B> quic::BidiStream<B> for BidiStream<B>
where
B: Buf,
{
type SendStream = SendStream<B>;
impl quic::BidiStream for BidiStream {
type SendStream = SendStream;
type RecvStream = RecvStream;

fn split(self) -> (Self::SendStream, Self::RecvStream) {
(self.send, self.recv)
}
}

impl<B> quic::RecvStream for BidiStream<B>
where
B: Buf,
{
impl quic::RecvStream for BidiStream {
type Buf = Bytes;
type Error = ReadError;

Expand All @@ -259,14 +244,15 @@ where
}
}

impl<B> quic::SendStream<B> for BidiStream<B>
where
B: Buf,
{
impl quic::SendStream for BidiStream {
type Error = SendStreamError;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.send.poll_ready(cx)
fn poll_write<B: Buf>(
&mut self,
buf: &mut B,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.send.poll_write(buf, cx)
}

fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand All @@ -277,10 +263,6 @@ where
self.send.reset(reset_code)
}

fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
self.send.send_data(data)
}

fn id(&self) -> StreamId {
self.send.id()
}
Expand Down Expand Up @@ -360,55 +342,44 @@ impl Error for ReadError {
}
}

pub struct SendStream<B: Buf> {
pub struct SendStream {
stream: quinn::SendStream,
writing: Option<WriteBuf<B>>,
}

impl<B> SendStream<B>
where
B: Buf,
{
fn new(stream: quinn::SendStream) -> SendStream<B> {
Self {
stream,
writing: None,
}
impl SendStream {
fn new(stream: quinn::SendStream) -> SendStream {
Self { stream }
}
}

impl<B> quic::SendStream<B> for SendStream<B>
where
B: Buf,
{
impl quic::SendStream for SendStream {
type Error = SendStreamError;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(ref mut data) = self.writing {
while data.has_remaining() {
match ready!(Pin::new(&mut self.stream).poll_write(cx, data.chunk())) {
Ok(cnt) => data.advance(cnt),
Err(err) => {
// We are forced to use AsyncWrite for now because we cannot store
// the result of a call to:
// quinn::send_stream::write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, S>.
//
// This is why we have to unpack the error from io::Error below. This should not
// panic as long as quinn's AsyncWrite impl doesn't change.
return Poll::Ready(Err(SendStreamError::Write(
err.into_inner()
.expect("write stream returned an empty error")
.downcast_ref::<WriteError>()
.expect(
"write stream returned an error which type is not WriteError",
)
.clone(),
)));
}
fn poll_write<B: Buf>(
&mut self,
buf: &mut B,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
while buf.has_remaining() {
match ready!(Pin::new(&mut self.stream).poll_write(cx, buf.chunk())) {
Ok(cnt) => buf.advance(cnt),
Err(err) => {
// We are forced to use AsyncWrite for now because we cannot store
// the result of a call to:
// quinn::send_stream::write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, S>.
//
// This is why we have to unpack the error from io::Error below. This should not
// panic as long as quinn's AsyncWrite impl doesn't change.
return Poll::Ready(Err(SendStreamError::Write(
err.into_inner()
.expect("write stream returned an empty error")
.downcast_ref::<WriteError>()
.expect("write stream returned an error which type is not WriteError")
.clone(),
)));
}
}
}
self.writing = None;
Poll::Ready(Ok(()))
}

Expand All @@ -422,14 +393,6 @@ where
.reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX));
}

fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
if self.writing.is_some() {
return Err(Self::Error::NotReady);
}
self.writing = Some(data.into());
Ok(())
}

fn id(&self) -> StreamId {
self.stream.id().0.try_into().expect("invalid stream id")
}
Expand Down
Loading