Skip to content

Commit fd7eb8f

Browse files
committed
Add BodyExt::with_trailers
1 parent c58b641 commit fd7eb8f

File tree

4 files changed

+203
-1
lines changed

4 files changed

+203
-1
lines changed

http-body-util/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,4 @@ http-body = { version = "1", path = "../http-body" }
3333
pin-project-lite = "0.2"
3434

3535
[dev-dependencies]
36-
tokio = { version = "1", features = ["macros", "rt"] }
36+
tokio = { version = "1", features = ["macros", "rt", "sync"] }

http-body-util/src/combinators/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ mod collect;
55
mod frame;
66
mod map_err;
77
mod map_frame;
8+
mod with_trailers;
89

910
pub use self::{
1011
box_body::{BoxBody, UnsyncBoxBody},
1112
collect::Collect,
1213
frame::Frame,
1314
map_err::MapErr,
1415
map_frame::MapFrame,
16+
with_trailers::WithTrailers,
1517
};
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
use std::{
2+
future::Future,
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
7+
use futures_util::ready;
8+
use http::HeaderMap;
9+
use http_body::{Body, Frame};
10+
use pin_project_lite::pin_project;
11+
12+
pin_project! {
13+
/// TODO
14+
pub struct WithTrailers<T, F> {
15+
#[pin]
16+
state: State<T, F>,
17+
}
18+
}
19+
20+
impl<T, F> WithTrailers<T, F> {
21+
pub(crate) fn new(body: T, trailers: F) -> Self {
22+
Self {
23+
state: State::PollBody {
24+
body,
25+
trailers: Some(trailers),
26+
},
27+
}
28+
}
29+
}
30+
31+
pin_project! {
32+
#[project = StateProj]
33+
enum State<T, F> {
34+
PollBody {
35+
#[pin]
36+
body: T,
37+
trailers: Option<F>,
38+
},
39+
PollTrailers {
40+
#[pin]
41+
trailers: F,
42+
},
43+
Trailers {
44+
trailers: Option<HeaderMap>,
45+
}
46+
}
47+
}
48+
49+
impl<T, F> Body for WithTrailers<T, F>
50+
where
51+
T: Body,
52+
F: Future<Output = Option<Result<HeaderMap, T::Error>>>,
53+
{
54+
type Data = T::Data;
55+
type Error = T::Error;
56+
57+
fn poll_frame(
58+
mut self: Pin<&mut Self>,
59+
cx: &mut Context<'_>,
60+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
61+
loop {
62+
let mut this = self.as_mut().project();
63+
64+
let new_state: State<_, _> = match this.state.as_mut().project() {
65+
StateProj::PollBody { body, trailers } => match ready!(body.poll_frame(cx)?) {
66+
Some(frame) => {
67+
return Poll::Ready(Some(Ok(frame)));
68+
}
69+
None => {
70+
let trailers = trailers.take().unwrap();
71+
State::PollTrailers { trailers }
72+
}
73+
},
74+
StateProj::PollTrailers { trailers } => {
75+
let trailers = ready!(trailers.poll(cx)?);
76+
State::Trailers { trailers }
77+
}
78+
StateProj::Trailers { trailers } => {
79+
return Poll::Ready(trailers.take().map(Frame::trailers).map(Ok));
80+
}
81+
};
82+
83+
this.state.set(new_state);
84+
}
85+
}
86+
87+
#[inline]
88+
fn is_end_stream(&self) -> bool {
89+
match &self.state {
90+
State::PollBody { body, .. } => body.is_end_stream(),
91+
State::PollTrailers { .. } | State::Trailers { .. } => true,
92+
}
93+
}
94+
95+
#[inline]
96+
fn size_hint(&self) -> http_body::SizeHint {
97+
match &self.state {
98+
State::PollBody { body, .. } => body.size_hint(),
99+
State::PollTrailers { .. } | State::Trailers { .. } => Default::default(),
100+
}
101+
}
102+
}
103+
104+
#[cfg(test)]
105+
mod tests {
106+
use std::convert::Infallible;
107+
108+
use bytes::Bytes;
109+
use http::{HeaderMap, HeaderName, HeaderValue};
110+
111+
use crate::{BodyExt, Full};
112+
113+
#[allow(unused_imports)]
114+
use super::*;
115+
116+
#[tokio::test]
117+
async fn works() {
118+
let mut trailers = HeaderMap::new();
119+
trailers.insert(
120+
HeaderName::from_static("foo"),
121+
HeaderValue::from_static("bar"),
122+
);
123+
124+
let body =
125+
Full::<Bytes>::from("hello").with_trailers(std::future::ready(Some(
126+
Ok::<_, Infallible>(trailers.clone()),
127+
)));
128+
129+
futures_util::pin_mut!(body);
130+
let waker = futures_util::task::noop_waker();
131+
let mut cx = Context::from_waker(&waker);
132+
133+
let data = unwrap_ready(body.as_mut().poll_frame(&mut cx))
134+
.unwrap()
135+
.unwrap()
136+
.into_data()
137+
.unwrap();
138+
assert_eq!(data, "hello");
139+
140+
let body_trailers = unwrap_ready(body.as_mut().poll_frame(&mut cx))
141+
.unwrap()
142+
.unwrap()
143+
.into_trailers()
144+
.unwrap();
145+
assert_eq!(body_trailers, trailers);
146+
147+
assert!(unwrap_ready(body.as_mut().poll_frame(&mut cx)).is_none());
148+
}
149+
150+
fn unwrap_ready<T>(poll: Poll<T>) -> T {
151+
match poll {
152+
Poll::Ready(t) => t,
153+
Poll::Pending => panic!("pending"),
154+
}
155+
}
156+
}

http-body-util/src/lib.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,50 @@ pub trait BodyExt: http_body::Body {
8989
collected: Some(crate::Collected::default()),
9090
}
9191
}
92+
93+
/// Add trailers to the body.
94+
///
95+
/// The trailers will be sent when all previous frames have been sent and the `trailers` future
96+
/// resolves.
97+
///
98+
/// # Example
99+
///
100+
/// ```
101+
/// use http::HeaderMap;
102+
/// use http_body_util::{Full, BodyExt};
103+
/// use bytes::Bytes;
104+
///
105+
/// # #[tokio::main]
106+
/// async fn main() {
107+
/// let (tx, rx) = tokio::sync::oneshot::channel::<HeaderMap>();
108+
///
109+
/// let body = Full::<Bytes>::from("Hello, World!")
110+
/// // add trailers via a future
111+
/// .with_trailers(async move {
112+
/// match rx.await {
113+
/// Ok(trailers) => Some(Ok(trailers)),
114+
/// Err(_err) => None,
115+
/// }
116+
/// });
117+
///
118+
/// // compute the trailers in the background
119+
/// tokio::spawn(async move {
120+
/// let _ = tx.send(compute_trailers().await);
121+
/// });
122+
///
123+
/// async fn compute_trailers() -> HeaderMap {
124+
/// // ...
125+
/// # unimplemented!()
126+
/// }
127+
/// # }
128+
/// ```
129+
fn with_trailers<F>(self, trailers: F) -> combinators::WithTrailers<Self, F>
130+
where
131+
Self: Sized,
132+
F: std::future::Future<Output = Option<Result<http::HeaderMap, Self::Error>>>,
133+
{
134+
combinators::WithTrailers::new(self, trailers)
135+
}
92136
}
93137

94138
impl<T: ?Sized> BodyExt for T where T: http_body::Body {}

0 commit comments

Comments
 (0)