Skip to content

Commit

Permalink
feat(util): Add BodyDataStream
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Mar 30, 2024
1 parent b5c769d commit 9b1bd12
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
10 changes: 9 additions & 1 deletion http-body-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use self::either::Either;
pub use self::empty::Empty;
pub use self::full::Full;
pub use self::limited::{LengthLimitError, Limited};
pub use self::stream::{BodyStream, StreamBody};
pub use self::stream::{BodyDataStream, BodyStream, StreamBody};

/// An extension trait for [`http_body::Body`] adding various combinators and adapters
pub trait BodyExt: http_body::Body {
Expand Down Expand Up @@ -128,6 +128,14 @@ pub trait BodyExt: http_body::Body {
{
combinators::WithTrailers::new(self, trailers)
}

/// Turn this body into [`BodyDataStream`].
fn into_data_stream(self) -> BodyDataStream<Self>
where
Self: Sized,
{
BodyDataStream::new(self)
}
}

impl<T: ?Sized> BodyExt for T where T: http_body::Body {}
38 changes: 37 additions & 1 deletion http-body-util/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use http_body::{Body, Frame};
use pin_project_lite::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
};

pin_project! {
Expand Down Expand Up @@ -101,6 +101,42 @@ where
}
}

pin_project! {
/// A data stream created from a [`Body`].
#[derive(Clone, Copy, Debug)]
pub struct BodyDataStream<B> {
#[pin]
body: B,
}
}

impl<B> BodyDataStream<B> {
/// Create a new `BodyDataStream`
pub fn new(body: B) -> Self {
Self { body }
}
}

impl<B> Stream for BodyDataStream<B>
where
B: Body,
{
type Item = Result<B::Data, B::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().body.poll_frame(cx)) {
Some(Ok(frame)) => match frame.into_data() {
Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
Err(_) => continue,
},
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
};
}
}
}

#[cfg(test)]
mod tests {
use crate::{BodyExt, BodyStream, StreamBody};
Expand Down

0 comments on commit 9b1bd12

Please sign in to comment.