From 1c8f0469d89a694129047cb36f6c750a2bd3eb67 Mon Sep 17 00:00:00 2001 From: Yureka Date: Sat, 18 Jun 2022 00:21:23 +0200 Subject: [PATCH] move generic Buffer parameter to fn --- examples/server.rs | 6 +- h3-quinn/src/lib.rs | 129 ++++++++++------------------- h3/src/client.rs | 76 +++++++---------- h3/src/connection.rs | 37 ++++----- h3/src/frame.rs | 20 ++--- h3/src/quic.rs | 29 +++---- h3/src/server.rs | 64 +++++++------- h3/src/stream.rs | 7 +- tests/h3-tests/src/lib.rs | 2 +- tests/h3-tests/tests/connection.rs | 16 ++-- tests/h3-tests/tests/request.rs | 22 ++--- 11 files changed, 169 insertions(+), 239 deletions(-) diff --git a/examples/server.rs b/examples/server.rs index 6a0d90f0..e8f547a8 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,6 +1,6 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc}; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use futures::StreamExt; use http::{Request, StatusCode}; use rustls::{Certificate, PrivateKey}; @@ -114,11 +114,11 @@ async fn main() -> Result<(), Box> { async fn handle_request( req: Request<()>, - mut stream: RequestStream, + mut stream: RequestStream, serve_root: Arc>, ) -> Result<(), Box> where - T: BidiStream, + T: BidiStream, { let (status, to_serve) = match serve_root.as_deref() { None => (StatusCode::OK, None), diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index c97e1b52..e75a7165 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -20,7 +20,7 @@ pub use quinn::{ OpenUni, VarInt, WriteError, }; -use h3::quic::{self, Error, StreamId, WriteBuf}; +use h3::quic::{self, Error, StreamId}; pub struct Connection { conn: quinn::Connection, @@ -82,13 +82,10 @@ impl From for ConnectionError { } } -impl quic::Connection for Connection -where - B: Buf, -{ - type SendStream = SendStream; +impl quic::Connection for Connection { + type SendStream = SendStream; type RecvStream = RecvStream; - type BidiStream = BidiStream; + type BidiStream = BidiStream; type OpenStreams = OpenStreams; type Error = ConnectionError; @@ -166,13 +163,10 @@ pub struct OpenStreams { opening_uni: Option, } -impl quic::OpenStreams for OpenStreams -where - B: Buf, -{ +impl quic::OpenStreams for OpenStreams { type RecvStream = RecvStream; - type SendStream = SendStream; - type BidiStream = BidiStream; + type SendStream = SendStream; + type BidiStream = BidiStream; type Error = ConnectionError; fn poll_open_bidi( @@ -220,19 +214,13 @@ impl Clone for OpenStreams { } } -pub struct BidiStream -where - B: Buf, -{ - send: SendStream, +pub struct BidiStream { + send: SendStream, recv: RecvStream, } -impl quic::BidiStream for BidiStream -where - B: Buf, -{ - type SendStream = SendStream; +impl quic::BidiStream for BidiStream { + type SendStream = SendStream; type RecvStream = RecvStream; fn split(self) -> (Self::SendStream, Self::RecvStream) { @@ -240,10 +228,7 @@ where } } -impl quic::RecvStream for BidiStream -where - B: Buf, -{ +impl quic::RecvStream for BidiStream { type Buf = Bytes; type Error = ReadError; @@ -259,14 +244,15 @@ where } } -impl quic::SendStream for BidiStream -where - B: Buf, -{ +impl quic::SendStream for BidiStream { type Error = SendStreamError; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.send.poll_ready(cx) + fn poll_write( + &mut self, + buf: &mut B, + cx: &mut task::Context<'_>, + ) -> Poll> { + self.send.poll_write(buf, cx) } fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll> { @@ -277,10 +263,6 @@ where self.send.reset(reset_code) } - fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { - self.send.send_data(data) - } - fn id(&self) -> StreamId { self.send.id() } @@ -360,55 +342,44 @@ impl Error for ReadError { } } -pub struct SendStream { +pub struct SendStream { stream: quinn::SendStream, - writing: Option>, } -impl SendStream -where - B: Buf, -{ - fn new(stream: quinn::SendStream) -> SendStream { - Self { - stream, - writing: None, - } +impl SendStream { + fn new(stream: quinn::SendStream) -> SendStream { + Self { stream } } } -impl quic::SendStream for SendStream -where - B: Buf, -{ +impl quic::SendStream for SendStream { type Error = SendStreamError; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - if let Some(ref mut data) = self.writing { - while data.has_remaining() { - match ready!(Pin::new(&mut self.stream).poll_write(cx, data.chunk())) { - Ok(cnt) => data.advance(cnt), - Err(err) => { - // We are forced to use AsyncWrite for now because we cannot store - // the result of a call to: - // quinn::send_stream::write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, S>. - // - // This is why we have to unpack the error from io::Error below. This should not - // panic as long as quinn's AsyncWrite impl doesn't change. - return Poll::Ready(Err(SendStreamError::Write( - err.into_inner() - .expect("write stream returned an empty error") - .downcast_ref::() - .expect( - "write stream returned an error which type is not WriteError", - ) - .clone(), - ))); - } + fn poll_write( + &mut self, + buf: &mut B, + cx: &mut task::Context<'_>, + ) -> Poll> { + while buf.has_remaining() { + match ready!(Pin::new(&mut self.stream).poll_write(cx, buf.chunk())) { + Ok(cnt) => buf.advance(cnt), + Err(err) => { + // We are forced to use AsyncWrite for now because we cannot store + // the result of a call to: + // quinn::send_stream::write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, S>. + // + // This is why we have to unpack the error from io::Error below. This should not + // panic as long as quinn's AsyncWrite impl doesn't change. + return Poll::Ready(Err(SendStreamError::Write( + err.into_inner() + .expect("write stream returned an empty error") + .downcast_ref::() + .expect("write stream returned an error which type is not WriteError") + .clone(), + ))); } } } - self.writing = None; Poll::Ready(Ok(())) } @@ -422,14 +393,6 @@ where .reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX)); } - fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { - if self.writing.is_some() { - return Err(Self::Error::NotReady); - } - self.writing = Some(data.into()); - Ok(()) - } - fn id(&self) -> StreamId { self.stream.id().0.try_into().expect("invalid stream id") } diff --git a/h3/src/client.rs b/h3/src/client.rs index 990bb493..1b848937 100644 --- a/h3/src/client.rs +++ b/h3/src/client.rs @@ -1,6 +1,5 @@ use std::{ convert::TryFrom, - marker::PhantomData, sync::{atomic::AtomicUsize, Arc}, task::{Context, Poll, Waker}, }; @@ -22,18 +21,17 @@ pub fn builder() -> Builder { Builder::new() } -pub async fn new(conn: C) -> Result<(Connection, SendRequest), Error> +pub async fn new(conn: C) -> Result<(Connection, SendRequest), Error> where - C: quic::Connection, - O: quic::OpenStreams, + C: quic::Connection, + O: quic::OpenStreams, { Ok(Builder::new().build(conn).await?) } -pub struct SendRequest +pub struct SendRequest where - T: quic::OpenStreams, - B: Buf, + T: quic::OpenStreams, { open: T, conn_state: SharedStateRef, @@ -41,18 +39,16 @@ where // counts instances of SendRequest to close the connection when the last is dropped. sender_count: Arc, conn_waker: Option, - _buf: PhantomData, } -impl SendRequest +impl SendRequest where - T: quic::OpenStreams, - B: Buf, + T: quic::OpenStreams, { pub async fn send_request( &mut self, req: http::Request<()>, - ) -> Result, Error> { + ) -> Result, Error> { let (peer_max_field_section_size, closing) = { let state = self.conn_state.read("send request lock state"); (state.peer_max_field_section_size, state.closing) @@ -81,7 +77,7 @@ where return Err(Error::header_too_big(mem_size, peer_max_field_section_size)); } - stream::write(&mut stream, Frame::Headers(block.freeze())) + stream::write(&mut stream, Frame::::Headers(block.freeze())) .await .map_err(|e| self.maybe_conn_err(e))?; @@ -95,20 +91,18 @@ where } } -impl ConnectionState for SendRequest +impl ConnectionState for SendRequest where - T: quic::OpenStreams, - B: Buf, + T: quic::OpenStreams, { fn shared_state(&self) -> &SharedStateRef { &self.conn_state } } -impl Clone for SendRequest +impl Clone for SendRequest where - T: quic::OpenStreams + Clone, - B: Buf, + T: quic::OpenStreams + Clone, { fn clone(&self) -> Self { self.sender_count @@ -120,15 +114,13 @@ where max_field_section_size: self.max_field_section_size, sender_count: self.sender_count.clone(), conn_waker: self.conn_waker.clone(), - _buf: PhantomData, } } } -impl Drop for SendRequest +impl Drop for SendRequest where - T: quic::OpenStreams, - B: Buf, + T: quic::OpenStreams, { fn drop(&mut self) { if self @@ -145,18 +137,16 @@ where } } -pub struct Connection +pub struct Connection where - C: quic::Connection, - B: Buf, + C: quic::Connection, { - inner: ConnectionInner, + inner: ConnectionInner, } -impl Connection +impl Connection where - C: quic::Connection, - B: Buf, + C: quic::Connection, { pub async fn shutdown(&mut self, max_requests: usize) -> Result<(), Error> { self.inner.shutdown(max_requests).await @@ -230,14 +220,10 @@ impl Builder { self } - pub async fn build( - &mut self, - quic: C, - ) -> Result<(Connection, SendRequest), Error> + pub async fn build(&mut self, quic: C) -> Result<(Connection, SendRequest), Error> where - C: quic::Connection, - O: quic::OpenStreams, - B: Buf, + C: quic::Connection, + O: quic::OpenStreams, { let open = quic.opener(); let conn_state = SharedStateRef::default(); @@ -255,20 +241,19 @@ impl Builder { conn_waker, max_field_section_size: self.max_field_section_size, sender_count: Arc::new(AtomicUsize::new(1)), - _buf: PhantomData, }, )) } } -pub struct RequestStream +pub struct RequestStream where S: quic::RecvStream, { - inner: connection::RequestStream, B>, + inner: connection::RequestStream>, } -impl ConnectionState for RequestStream +impl ConnectionState for RequestStream where S: quic::RecvStream, { @@ -277,7 +262,7 @@ where } } -impl RequestStream +impl RequestStream where S: quic::RecvStream, { @@ -337,12 +322,11 @@ where } } -impl RequestStream +impl RequestStream where - S: quic::RecvStream + quic::SendStream, - B: Buf, + S: quic::RecvStream + quic::SendStream, { - pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { + pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { self.inner.send_data(buf).await } diff --git a/h3/src/connection.rs b/h3/src/connection.rs index 9ef99ab9..761bbb64 100644 --- a/h3/src/connection.rs +++ b/h3/src/connection.rs @@ -1,6 +1,5 @@ use std::{ convert::TryFrom, - marker::PhantomData, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, task::{Context, Poll}, }; @@ -71,10 +70,9 @@ pub trait ConnectionState { } } -pub struct ConnectionInner +pub struct ConnectionInner where - C: quic::Connection, - B: Buf, + C: quic::Connection, { pub(super) shared: SharedStateRef, conn: C, @@ -89,10 +87,9 @@ where got_peer_settings: bool, } -impl ConnectionInner +impl ConnectionInner where - C: quic::Connection, - B: Buf, + C: quic::Connection, { pub async fn new( mut conn: C, @@ -110,7 +107,7 @@ where stream::write( &mut control_send, - (StreamType::CONTROL, Frame::Settings(settings)), + (StreamType::CONTROL, Frame::::Settings(settings)), ) .await?; @@ -136,7 +133,7 @@ where self.shared.write("graceful shutdown").closing = Some(max_id); - stream::write(&mut self.control_send, Frame::Goaway(max_id)).await + stream::write(&mut self.control_send, Frame::::Goaway(max_id)).await } pub fn poll_accept_request( @@ -306,33 +303,31 @@ where } } -pub struct RequestStream { +pub struct RequestStream { pub(super) stream: S, pub(super) trailers: Option, pub(super) conn_state: SharedStateRef, pub(super) max_field_section_size: u64, - _phantom_buffer: PhantomData, } -impl RequestStream { +impl RequestStream { pub fn new(stream: S, max_field_section_size: u64, conn_state: SharedStateRef) -> Self { Self { stream, conn_state, max_field_section_size, trailers: None, - _phantom_buffer: PhantomData, } } } -impl ConnectionState for RequestStream { +impl ConnectionState for RequestStream { fn shared_state(&self) -> &SharedStateRef { &self.conn_state } } -impl RequestStream, B> +impl RequestStream> where S: quic::RecvStream, { @@ -406,13 +401,12 @@ where } } -impl RequestStream, B> +impl RequestStream> where - S: quic::SendStream + quic::RecvStream, - B: Buf, + S: quic::SendStream + quic::RecvStream, { /// Send some data on the response body. - pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { + pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { let frame = Frame::Data(buf); stream::write(&mut self.stream, frame) .await @@ -433,7 +427,7 @@ where return Err(Error::header_too_big(mem_size, max_mem_size)); } - stream::write(&mut self.stream, Frame::Headers(block.freeze())) + stream::write(&mut self.stream, Frame::::Headers(block.freeze())) .await .map_err(|e| self.maybe_conn_err(e))?; @@ -441,9 +435,6 @@ where } pub async fn finish(&mut self) -> Result<(), Error> { - future::poll_fn(|cx| self.stream.poll_ready(cx)) - .await - .map_err(|e| self.maybe_conn_err(e))?; future::poll_fn(|cx| self.stream.poll_finish(cx)) .await .map_err(|e| self.maybe_conn_err(e)) diff --git a/h3/src/frame.rs b/h3/src/frame.rs index 4a19f93d..6c553530 100644 --- a/h3/src/frame.rs +++ b/h3/src/frame.rs @@ -13,7 +13,6 @@ use crate::{ stream::StreamId, }, quic::{RecvStream, SendStream}, - stream::WriteBuf, }; pub struct FrameStream @@ -136,19 +135,18 @@ where } } -impl SendStream for FrameStream +impl SendStream for FrameStream where - T: SendStream + RecvStream, - B: Buf, + T: SendStream + RecvStream, { - type Error = >::Error; + type Error = ::Error; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.stream.poll_ready(cx) - } - - fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { - self.stream.send_data(data) + fn poll_write( + &mut self, + buf: &mut B, + cx: &mut Context<'_>, + ) -> Poll> { + self.stream.poll_write(buf, cx) } fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll> { diff --git a/h3/src/quic.rs b/h3/src/quic.rs index 35cdcf29..9e71540c 100644 --- a/h3/src/quic.rs +++ b/h3/src/quic.rs @@ -30,15 +30,15 @@ impl<'a, E: Error + 'a> From for Box { } /// Trait representing a QUIC connection. -pub trait Connection { +pub trait Connection { /// The type produced by `poll_accept_bidi()` - type BidiStream: SendStream + RecvStream; + type BidiStream: SendStream + RecvStream; /// The type of the sending part of `BidiStream` - type SendStream: SendStream; + type SendStream: SendStream; /// The type produced by `poll_accept_recv()` type RecvStream: RecvStream; /// A producer of outgoing Unidirectional and Bidirectional streams. - type OpenStreams: OpenStreams; + type OpenStreams: OpenStreams; /// Error type yielded by this trait methods type Error: Into>; @@ -78,11 +78,11 @@ pub trait Connection { } /// Trait for opening outgoing streams -pub trait OpenStreams { +pub trait OpenStreams { /// The type produced by `poll_open_bidi()` - type BidiStream: SendStream + RecvStream; + type BidiStream: SendStream + RecvStream; /// The type produced by `poll_open_send()` - type SendStream: SendStream; + type SendStream: SendStream; /// The type of the receiving part of `BidiStream` type RecvStream: RecvStream; /// Error type yielded by these trait methods @@ -105,15 +105,16 @@ pub trait OpenStreams { } /// A trait describing the "send" actions of a QUIC stream. -pub trait SendStream { +pub trait SendStream { /// The error type returned by fallible send methods. type Error: Into>; - /// Polls if the stream can send more data. - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; - /// Send more data on the stream. - fn send_data>>(&mut self, data: T) -> Result<(), Self::Error>; + fn poll_write( + &mut self, + buf: &mut B, + cx: &mut task::Context<'_>, + ) -> Poll>; /// Poll to finish the sending side of the stream. fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll>; @@ -146,9 +147,9 @@ pub trait RecvStream { } /// Optional trait to allow "splitting" a bidirectional stream into two sides. -pub trait BidiStream: SendStream + RecvStream { +pub trait BidiStream: SendStream + RecvStream { /// The type for the send half. - type SendStream: SendStream; + type SendStream: SendStream; /// The type for the receive half. type RecvStream: RecvStream; diff --git a/h3/src/server.rs b/h3/src/server.rs index b92f8b8b..b68bd83a 100644 --- a/h3/src/server.rs +++ b/h3/src/server.rs @@ -25,12 +25,11 @@ pub fn builder() -> Builder { Builder::new() } -pub struct Connection +pub struct Connection where - C: quic::Connection, - B: Buf, + C: quic::Connection, { - inner: ConnectionInner, + inner: ConnectionInner, max_field_section_size: u64, // List of all incoming streams that are currently running. ongoing_streams: HashSet, @@ -39,33 +38,31 @@ where request_end_send: mpsc::UnboundedSender, } -impl ConnectionState for Connection +impl ConnectionState for Connection where - C: quic::Connection, - B: Buf, + C: quic::Connection, { fn shared_state(&self) -> &SharedStateRef { &self.inner.shared } } -impl Connection +impl Connection where - C: quic::Connection, + C: quic::Connection, { pub async fn new(conn: C) -> Result { Ok(builder().build(conn).await?) } } -impl Connection +impl Connection where - C: quic::Connection, - B: Buf, + C: quic::Connection, { pub async fn accept( &mut self, - ) -> Result, RequestStream)>, Error> { + ) -> Result, RequestStream)>, Error> { let mut stream = match future::poll_fn(|cx| self.poll_accept_request(cx)).await { Ok(Some(s)) => FrameStream::new(s), Ok(None) => { @@ -246,10 +243,9 @@ where } } -impl Drop for Connection +impl Drop for Connection where - C: quic::Connection, - B: Buf, + C: quic::Connection, { fn drop(&mut self) { self.inner.close(Code::H3_NO_ERROR, ""); @@ -274,10 +270,9 @@ impl Builder { } impl Builder { - pub async fn build(&self, conn: C) -> Result, Error> + pub async fn build(&self, conn: C) -> Result, Error> where - C: quic::Connection, - B: Buf, + C: quic::Connection, { let (sender, receiver) = mpsc::unbounded_channel(); Ok(Connection { @@ -295,16 +290,16 @@ impl Builder { } } -pub struct RequestStream +pub struct RequestStream where S: quic::RecvStream, { - inner: connection::RequestStream, B>, + inner: connection::RequestStream>, stream_id: StreamId, request_end: mpsc::UnboundedSender, } -impl ConnectionState for RequestStream +impl ConnectionState for RequestStream where S: quic::RecvStream, { @@ -313,7 +308,7 @@ where } } -impl RequestStream +impl RequestStream where S: quic::RecvStream, { @@ -326,10 +321,9 @@ where } } -impl RequestStream +impl RequestStream where - S: quic::RecvStream + quic::SendStream, - B: Buf, + S: quic::RecvStream + quic::SendStream, { pub async fn send_response(&mut self, resp: Response<()>) -> Result<(), Error> { let (parts, _) = resp.into_parts(); @@ -350,14 +344,17 @@ where return Err(Error::header_too_big(mem_size, max_mem_size)); } - stream::write(&mut self.inner.stream, Frame::Headers(block.freeze())) - .await - .map_err(|e| self.maybe_conn_err(e))?; + stream::write( + &mut self.inner.stream, + Frame::::Headers(block.freeze()), + ) + .await + .map_err(|e| self.maybe_conn_err(e))?; Ok(()) } - pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { + pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { self.inner.send_data(buf).await } @@ -370,10 +367,9 @@ where } } -impl RequestStream +impl RequestStream where - S: quic::RecvStream + quic::SendStream, - B: Buf, + S: quic::RecvStream + quic::SendStream, { pub async fn recv_trailers(&mut self) -> Result, Error> { let res = self.inner.recv_trailers().await; @@ -392,7 +388,7 @@ where } } -impl Drop for RequestStream +impl Drop for RequestStream where S: quic::RecvStream, { diff --git a/h3/src/stream.rs b/h3/src/stream.rs index ae43bdae..fe475cc0 100644 --- a/h3/src/stream.rs +++ b/h3/src/stream.rs @@ -21,13 +21,12 @@ use crate::{ #[inline] pub(crate) async fn write(stream: &mut S, data: D) -> Result<(), Error> where - S: SendStream, + S: SendStream, D: Into>, B: Buf, { - stream.send_data(data)?; - future::poll_fn(|cx| stream.poll_ready(cx)).await?; - + let mut write_buf: WriteBuf = data.into(); + future::poll_fn(|cx| stream.poll_write(&mut write_buf, cx)).await?; Ok(()) } diff --git a/tests/h3-tests/src/lib.rs b/tests/h3-tests/src/lib.rs index 21ad2c54..2f9d550b 100644 --- a/tests/h3-tests/src/lib.rs +++ b/tests/h3-tests/src/lib.rs @@ -123,7 +123,7 @@ pub struct Server { } impl Server { - pub async fn next(&mut self) -> impl quic::Connection { + pub async fn next(&mut self) -> impl quic::Connection { Connection::new(self.incoming.next().await.unwrap().await.unwrap()) } } diff --git a/tests/h3-tests/tests/connection.rs b/tests/h3-tests/tests/connection.rs index ea6b681d..6e12f82f 100644 --- a/tests/h3-tests/tests/connection.rs +++ b/tests/h3-tests/tests/connection.rs @@ -1,7 +1,7 @@ use std::{borrow::BorrowMut, time::Duration}; use assert_matches::assert_matches; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Bytes, BytesMut}; use futures::{future, StreamExt}; use http::{Request, Response, StatusCode}; @@ -179,7 +179,7 @@ async fn settings_exchange_server() { let client_fut = async { let (mut conn, _client) = client::builder() .max_field_section_size(12) - .build::<_, _, Bytes>(pair.client().await) + .build::<_, _>(pair.client().await) .await .expect("client init"); let drive = async move { @@ -654,11 +654,10 @@ async fn graceful_shutdown_closes_when_idle() { }; } -async fn request(mut send_request: T) -> Result, h3::Error> +async fn request(mut send_request: T) -> Result, h3::Error> where - T: BorrowMut>, - O: quic::OpenStreams, - B: Buf, + T: BorrowMut>, + O: quic::OpenStreams, { let mut request_stream = send_request .borrow_mut() @@ -667,10 +666,9 @@ where request_stream.recv_response().await } -async fn response(mut stream: server::RequestStream) +async fn response(mut stream: server::RequestStream) where - S: quic::RecvStream + SendStream, - B: Buf, + S: quic::RecvStream + SendStream, { stream .send_response( diff --git a/tests/h3-tests/tests/request.rs b/tests/h3-tests/tests/request.rs index 9d83df13..5e39d528 100644 --- a/tests/h3-tests/tests/request.rs +++ b/tests/h3-tests/tests/request.rs @@ -67,7 +67,7 @@ async fn get() { .await .expect("send_response"); request_stream - .send_data("wonderful hypertext".into()) + .send_data(Bytes::from("wonderful hypertext")) .await .expect("send_data"); request_stream.finish().await.expect("finish"); @@ -123,7 +123,7 @@ async fn get_with_trailers_unknown_content_type() { .await .expect("send_response"); request_stream - .send_data("wonderful hypertext".into()) + .send_data(Bytes::from("wonderful hypertext")) .await .expect("send_data"); let mut trailers = HeaderMap::new(); @@ -184,7 +184,7 @@ async fn get_with_trailers_known_content_type() { .await .expect("send_response"); request_stream - .send_data("wonderful hypertext".into()) + .send_data(Bytes::from("wonderful hypertext")) .await .expect("send_data"); @@ -216,7 +216,7 @@ async fn post() { .expect("request"); request_stream - .send_data("wonderful json".into()) + .send_data(Bytes::from("wonderful json")) .await .expect("send_data"); request_stream.finish().await.expect("client finish"); @@ -317,7 +317,7 @@ async fn header_too_big_response_from_server_trailers() { .await .expect("request"); request_stream - .send_data("wonderful json".into()) + .send_data(Bytes::from("wonderful json")) .await .expect("send_data"); @@ -430,7 +430,7 @@ async fn header_too_big_client_error_trailer() { .await .expect("request"); request_stream - .send_data("wonderful json".into()) + .send_data(Bytes::from("wonderful json")) .await .expect("send_data"); @@ -486,7 +486,7 @@ async fn header_too_big_discard_from_client() { // Do not poll driver so client doesn't know about server's max_field section size setting let (_conn, mut client) = client::builder() .max_field_section_size(12) - .build::<_, _, Bytes>(pair.client().await) + .build(pair.client().await) .await .expect("client init"); let mut request_stream = client @@ -535,7 +535,7 @@ async fn header_too_big_discard_from_client() { // Keep sending: wait for the stream to be cancelled by the client let mut err = None; for _ in 0..100 { - if let Err(e) = request_stream.send_data("some data".into()).await { + if let Err(e) = request_stream.send_data(Bytes::from("some data")).await { err = Some(e); break; } @@ -564,7 +564,7 @@ async fn header_too_big_discard_from_client_trailers() { // Do not poll driver so client doesn't know about server's max_field section size setting let (mut driver, mut client) = client::builder() .max_field_section_size(200) - .build::<_, _, Bytes>(pair.client().await) + .build::<_, _>(pair.client().await) .await .expect("client init"); let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await }; @@ -613,7 +613,7 @@ async fn header_too_big_discard_from_client_trailers() { .expect("send_response"); request_stream - .send_data("wonderful hypertext".into()) + .send_data(Bytes::from("wonderful hypertext")) .await .expect("send_data"); @@ -729,7 +729,7 @@ async fn header_too_big_server_error_trailers() { .await .unwrap(); request_stream - .send_data("wonderful hypertext".into()) + .send_data(Bytes::from("wonderful hypertext")) .await .expect("send_data");