From 23212f131ae43ba250d42b73c85943f1737e5f7d Mon Sep 17 00:00:00 2001 From: tottoto Date: Tue, 11 Jun 2024 04:09:34 +0900 Subject: [PATCH] feat(util): add BodyDataStream (#117) --- http-body-util/Cargo.toml | 3 +- http-body-util/src/combinators/collect.rs | 2 +- .../src/combinators/with_trailers.rs | 2 +- http-body-util/src/lib.rs | 10 ++++- http-body-util/src/stream.rs | 38 ++++++++++++++++++- 5 files changed, 49 insertions(+), 6 deletions(-) diff --git a/http-body-util/Cargo.toml b/http-body-util/Cargo.toml index 6554401..07daa78 100644 --- a/http-body-util/Cargo.toml +++ b/http-body-util/Cargo.toml @@ -28,11 +28,10 @@ rust-version = "1.49" [dependencies] bytes = "1" -futures-core = "0.3" +futures-util = { version = "0.3", default-features = false } http = "1" http-body = { version = "1", path = "../http-body" } pin-project-lite = "0.2" [dev-dependencies] tokio = { version = "1", features = ["macros", "rt", "sync", "rt-multi-thread"] } -futures-util = { version = "0.3.14", default-features = false } diff --git a/http-body-util/src/combinators/collect.rs b/http-body-util/src/combinators/collect.rs index 4d35413..d89e721 100644 --- a/http-body-util/src/combinators/collect.rs +++ b/http-body-util/src/combinators/collect.rs @@ -29,7 +29,7 @@ impl Future for Collect { let mut me = self.project(); loop { - let frame = futures_core::ready!(me.body.as_mut().poll_frame(cx)); + let frame = futures_util::ready!(me.body.as_mut().poll_frame(cx)); let frame = if let Some(frame) = frame { frame? diff --git a/http-body-util/src/combinators/with_trailers.rs b/http-body-util/src/combinators/with_trailers.rs index 38e0061..92f466a 100644 --- a/http-body-util/src/combinators/with_trailers.rs +++ b/http-body-util/src/combinators/with_trailers.rs @@ -4,7 +4,7 @@ use std::{ task::{Context, Poll}, }; -use futures_core::ready; +use futures_util::ready; use http::HeaderMap; use http_body::{Body, Frame}; use pin_project_lite::pin_project; diff --git a/http-body-util/src/lib.rs b/http-body-util/src/lib.rs index b655e2a..dee852c 100644 --- a/http-body-util/src/lib.rs +++ b/http-body-util/src/lib.rs @@ -24,7 +24,7 @@ pub use self::either::Either; pub use self::empty::Empty; pub use self::full::Full; pub use self::limited::{LengthLimitError, Limited}; -pub use self::stream::{BodyStream, StreamBody}; +pub use self::stream::{BodyDataStream, BodyStream, StreamBody}; /// An extension trait for [`http_body::Body`] adding various combinators and adapters pub trait BodyExt: http_body::Body { @@ -128,6 +128,14 @@ pub trait BodyExt: http_body::Body { { combinators::WithTrailers::new(self, trailers) } + + /// Turn this body into [`BodyDataStream`]. + fn into_data_stream(self) -> BodyDataStream + where + Self: Sized, + { + BodyDataStream::new(self) + } } impl BodyExt for T where T: http_body::Body {} diff --git a/http-body-util/src/stream.rs b/http-body-util/src/stream.rs index d872404..7eeafad 100644 --- a/http-body-util/src/stream.rs +++ b/http-body-util/src/stream.rs @@ -1,5 +1,5 @@ use bytes::Buf; -use futures_core::Stream; +use futures_util::{ready, stream::Stream}; use http_body::{Body, Frame}; use pin_project_lite::pin_project; use std::{ @@ -101,6 +101,42 @@ where } } +pin_project! { + /// A data stream created from a [`Body`]. + #[derive(Clone, Copy, Debug)] + pub struct BodyDataStream { + #[pin] + body: B, + } +} + +impl BodyDataStream { + /// Create a new `BodyDataStream` + pub fn new(body: B) -> Self { + Self { body } + } +} + +impl Stream for BodyDataStream +where + B: Body, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + return match ready!(self.as_mut().project().body.poll_frame(cx)) { + Some(Ok(frame)) => match frame.into_data() { + Ok(bytes) => Poll::Ready(Some(Ok(bytes))), + Err(_) => continue, + }, + Some(Err(err)) => Poll::Ready(Some(Err(err))), + None => Poll::Ready(None), + }; + } + } +} + #[cfg(test)] mod tests { use crate::{BodyExt, BodyStream, StreamBody};