Skip to content

Commit

Permalink
expose poll_data equivalent of recv_data
Browse files Browse the repository at this point in the history
Useful to have this exposed for example to implement an AsyncRead
on the stream. poll_data does exactly what recv_data was doing,
recv_data now just calls poll_fn(poll_data)
  • Loading branch information
Gopa Kumar committed Dec 17, 2023
1 parent 5c16195 commit 56e9616
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 34 deletions.
5 changes: 5 additions & 0 deletions h3/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,11 @@ where
self.inner.recv_data().await
}

/// Poll for data sent from the server.
pub fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<impl Buf>, Error>> {
self.inner.poll_data(cx)
}

/// Receive an optional set of trailers for the response.
pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
let res = self.inner.recv_trailers().await;
Expand Down
78 changes: 44 additions & 34 deletions h3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,47 +622,57 @@ impl<S, B> RequestStream<S, B>
where
S: quic::RecvStream,
{
/// Receive some of the request body.
pub async fn recv_data(&mut self) -> Result<Option<impl Buf>, Error> {
pub fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<impl Buf>, Error>> {
if !self.stream.has_data() {
let frame = future::poll_fn(|cx| self.stream.poll_next(cx))
.await
.map_err(|e| self.maybe_conn_err(e))?;
match frame {
Some(Frame::Data { .. }) => (),
Some(Frame::Headers(encoded)) => {
self.trailers = Some(encoded);
return Ok(None);
}
match self.stream.poll_next(cx) {
Poll::Ready(Ok(frame)) => {
match frame {
Some(Frame::Data { .. }) => (),
Some(Frame::Headers(encoded)) => {
self.trailers = Some(encoded);
return Poll::Ready(Ok(None));
}

//= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
//# Receipt of an invalid sequence of frames MUST be treated as a
//# connection error of type H3_FRAME_UNEXPECTED.
//= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
//# Receipt of an invalid sequence of frames MUST be treated as a
//# connection error of type H3_FRAME_UNEXPECTED.

//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.3
//# Receiving a
//# CANCEL_PUSH frame on a stream other than the control stream MUST be
//# treated as a connection error of type H3_FRAME_UNEXPECTED.
//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.3
//# Receiving a
//# CANCEL_PUSH frame on a stream other than the control stream MUST be
//# treated as a connection error of type H3_FRAME_UNEXPECTED.

//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4
//# If an endpoint receives a SETTINGS frame on a different
//# stream, the endpoint MUST respond with a connection error of type
//# H3_FRAME_UNEXPECTED.

//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.6
//# A client MUST treat a GOAWAY frame on a stream other than
//# the control stream as a connection error of type H3_FRAME_UNEXPECTED.

//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.7
//# The MAX_PUSH_ID frame is always sent on the control stream. Receipt
//# of a MAX_PUSH_ID frame on any other stream MUST be treated as a
//# connection error of type H3_FRAME_UNEXPECTED.
Some(_) => return Err(Code::H3_FRAME_UNEXPECTED.into()),
None => return Ok(None),
//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4
//# If an endpoint receives a SETTINGS frame on a different
//# stream, the endpoint MUST respond with a connection error of type
//# H3_FRAME_UNEXPECTED.

//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.6
//# A client MUST treat a GOAWAY frame on a stream other than
//# the control stream as a connection error of type H3_FRAME_UNEXPECTED.

//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.7
//# The MAX_PUSH_ID frame is always sent on the control stream. Receipt
//# of a MAX_PUSH_ID frame on any other stream MUST be treated as a
//# connection error of type H3_FRAME_UNEXPECTED.
Some(_) => return Poll::Ready(Err(Code::H3_FRAME_UNEXPECTED.into())),
None => return Poll::Ready(Ok(None)),
}
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(self.maybe_conn_err(e))),
Poll::Pending => return Poll::Pending,
}
}
match self.stream.poll_data(cx) {
Poll::Ready(Ok(data)) => Poll::Ready(Ok(data)),
Poll::Ready(Err(e)) => Poll::Ready(Err(self.maybe_conn_err(e))),
Poll::Pending => Poll::Pending,
}
}

let data = future::poll_fn(|cx| self.stream.poll_data(cx))
/// Receive some of the request body.
pub async fn recv_data(&mut self) -> Result<Option<impl Buf>, Error> {
let data = future::poll_fn(|cx| self.poll_data(cx))
.await
.map_err(|e| self.maybe_conn_err(e))?;
Ok(data)
Expand Down
5 changes: 5 additions & 0 deletions h3/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,11 @@ where
self.inner.recv_data().await
}

/// Poll for data sent from the client
pub fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<impl Buf>, Error>> {
self.inner.poll_data(cx)
}

/// Receive an optional set of trailers for the request
pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
self.inner.recv_trailers().await
Expand Down

0 comments on commit 56e9616

Please sign in to comment.