Skip to content

Commit

Permalink
feat(http/retry): model ReplayBody<B> with Frame<T> (#3598)
Browse files Browse the repository at this point in the history
pr #3559 (dd4fbcd) refactored our trailer peeking body middleware to
model its buffering in terms of the `Frame<T>` type used in
`http-body`'s 1.0 release.

this branch performs a similar change for the other piece of body
middleware that super linkerd's retry facilities: `ReplayBody<B>`. the
inner body `B` is now wrapped in the `ForwardCompatibleBody<B>` adapter,
and we now poll it in terms of frames.

NB: polling the underlying in terms of frames has a subtle knock-on
effect regarding when we observe the trailers, in the liminal period
between this refactor and the subsequent upgrade to hyper 1.0, whilst we
must still implement the existing 0.4 interface for `Body` that includes
`poll_trailers()`.

see the comment above `replay_trailers` for more on this, describing why
we now initialize this to `true`. relatedly, this is why we no longer
delegate down to `B::poll_trailers` ourselves. it will have already been
called by our adapter.

`ReplayBody::is_end_stream()` now behaves identically when initially
polling a body compared to subsequent replays. this is fine, as
`is_end_stream()` is a hint that facilitates optimizations
(hyperium/http-body#143). we do still report the end properly, we just
won't be quite as prescient on the initial playthrough.

in the same manner as the existing `frame()` method mimics
`http_body_util::BodyExt::frame()`, this branch introduces
a new `ForwardCompatibleBody::poll_frame()` method.

this allows us to poll the compatibility layer for a `Frame<T>`.

see:
- linkerd/linkerd2#8733.
- #3559

---

* nit(http/retry): install tracing subscriber in tests

some tests do not set up a tracing subscriber, because they do not use
the shared `Test::new()` helper function used elsewhere in this test
suite.

to provide a trace of the test's execution in the event of a failure,
initialize a tracing subscriber in some additional unit tests.

Signed-off-by: katelyn martin <kate@buoyant.io>

* feat(http/retry): `ForwardCompatibleBody<B>` exposes hints

this commit removes the `cfg(test)` gate on the method exposing
`B::is_end_stream()`, and introduces another method also exposing the
`size_hint()` method.

we will want these in order to implement these methods for
`ReplayBody<B>`.

Signed-off-by: katelyn martin <kate@buoyant.io>

* refactor(http/retry): `ForwardCompatibleBody::poll_frame()`

in the same manner as the existing `frame()` method mimics
`http_body_util::BodyExt::frame()`, this commit introduces
a new `ForwardCompatibleBody::poll_frame()` method.

this allows us to poll the compatibility layer for a `Frame<T>`.

Signed-off-by: katelyn martin <kate@buoyant.io>

* feat(http/retry): `ReplayBody<B>` polls for frames

pr #3559 (dd4fbcd) refactored our trailer peeking body middleware to
model its buffering in terms of the `Frame<T>` type used in
`http-body`'s 1.0 release.

this commit performs a similar change for the other piece of body
middleware that super linkerd's retry facilities: `ReplayBody<B>`. the
inner body `B` is now wrapped in the `ForwardCompatibleBody<B>` adapter,
and we now poll it in terms of frames.

NB: polling the underlying in terms of frames has a subtle knock-on
effect regarding when we observe the trailers, in the liminal period
between this refactor and the subsequent upgrade to hyper 1.0, whilst we
must still implement the existing 0.4 interface for `Body` that includes
`poll_trailers()`.

see the comment above `replay_trailers` for more on this, describing why
we now initialize this to `true`. relatedly, this is why we now longer
delegate down to `B::poll_trailers` ourselves. it will have already been
called by our adapter.

`ReplayBody::is_end_stream()` now behaves identically when initially
polling a body compared to subsequent replays. this is fine, as
`is_end_stream()` is a hint that facilitates optimizations
(hyperium/http-body#143). we do still report the end properly, we just
won't be quite as prescient on the initial playthrough.

see:
- linkerd/linkerd2#8733.
- #3559

Signed-off-by: katelyn martin <kate@buoyant.io>

* feat(http/retry): `is_end_stream()` traces

this commit introduces some trace-level diagnostics tracking how the
replay body has determined whether or not it has reached the end of the
stream.

Signed-off-by: katelyn martin <kate@buoyant.io>

* nit(http/retry): capitalize trace event messages

Signed-off-by: katelyn martin <kate@buoyant.io>

---------

Signed-off-by: katelyn martin <kate@buoyant.io>
  • Loading branch information
cratelyn authored Feb 6, 2025
1 parent c4e0fd2 commit 4b53081
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 27 deletions.
24 changes: 22 additions & 2 deletions linkerd/http/retry/src/compat.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
//! Compatibility utilities for upgrading to http-body 1.0.
use http_body::Body;
use http_body::{Body, SizeHint};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

pub(crate) use self::frame::Frame;

Expand Down Expand Up @@ -42,10 +47,25 @@ impl<B: Body> ForwardCompatibleBody<B> {
}

/// Returns `true` when the end of stream has been reached.
#[cfg(test)]
pub(crate) fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

/// Returns the bounds on the remaining length of the stream.
pub(crate) fn size_hint(&self) -> SizeHint {
self.inner.size_hint()
}
}

impl<B: Body + Unpin> ForwardCompatibleBody<B> {
pub(crate) fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<B::Data>, B::Error>>> {
let mut fut = self.get_mut().frame();
let pinned = Pin::new(&mut fut);
pinned.poll(cx)
}
}

/// Future that resolves to the next frame from a `Body`.
Expand Down
99 changes: 74 additions & 25 deletions linkerd/http/retry/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct SharedState<B> {
struct BodyState<B> {
replay: Replay,
trailers: Option<HeaderMap>,
rest: B,
rest: crate::compat::ForwardCompatibleBody<B>,
is_completed: bool,

/// Maximum number of bytes to buffer.
Expand Down Expand Up @@ -104,13 +104,19 @@ impl<B: Body> ReplayBody<B> {
state: Some(BodyState {
replay: Default::default(),
trailers: None,
rest: body,
rest: crate::compat::ForwardCompatibleBody::new(body),
is_completed: false,
max_bytes: max_bytes + 1,
}),
// The initial `ReplayBody` has nothing to replay
// The initial `ReplayBody` has no data to replay.
replay_body: false,
replay_trailers: false,
// NOTE(kate): When polling the inner body in terms of frames, we will not yield
// `Ready(None)` from `Body::poll_data()` until we have reached the end of the
// underlying stream. Once we have migrated to `http-body` v1, this field will be
// initialized `false` thanks to the use of `Body::poll_frame()`, but for now we must
// initialize this to true; `poll_trailers()` will be called after the trailers have
// been observed previously, even for the initial body.
replay_trailers: true,
})
}

Expand Down Expand Up @@ -204,16 +210,33 @@ where
// Poll the inner body for more data. If the body has ended, remember
// that so that future clones will not try polling it again (as
// described above).
let data = {
let data: B::Data = {
use futures::{future::Either, ready};
// Poll the inner body for the next frame.
tracing::trace!("Polling initial body");
match futures::ready!(Pin::new(&mut state.rest).poll_data(cx)) {
Some(Ok(data)) => data,
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
let poll = Pin::new(&mut state.rest).poll_frame(cx).map_err(Into::into);
let frame = match ready!(poll) {
// The body yielded a new frame.
Some(Ok(frame)) => frame,
// The body yielded an error.
Some(Err(error)) => return Poll::Ready(Some(Err(error))),
// The body has reached the end of the stream.
None => {
tracing::trace!("Initial body completed");
state.is_completed = true;
return Poll::Ready(None);
}
};
// Now, inspect the frame: was it a chunk of data, or a trailers frame?
match Self::split_frame(frame) {
Some(Either::Left(data)) => data,
Some(Either::Right(trailers)) => {
tracing::trace!("Initial body completed");
state.trailers = Some(trailers);
state.is_completed = true;
return Poll::Ready(None);
}
None => return Poll::Ready(None),
}
};

Expand All @@ -234,7 +257,7 @@ where
/// NOT be polled until the previous body has been dropped.
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
let this = self.get_mut();
let state = Self::acquire_state(&mut this.state, &this.shared.body);
Expand All @@ -251,40 +274,40 @@ where
}
}

// If the inner body has previously ended, don't poll it again.
if !state.rest.is_end_stream() {
return Pin::new(&mut state.rest)
.poll_trailers(cx)
.map_ok(|tlrs| {
// Record a copy of the inner body's trailers in the shared state.
if state.trailers.is_none() {
state.trailers.clone_from(&tlrs);
}
tlrs
})
.map_err(Into::into);
}

Poll::Ready(Ok(None))
}

#[tracing::instrument(
skip_all,
level = "trace",
fields(
state.is_some = %self.state.is_some(),
replay_trailers = %self.replay_trailers,
replay_body = %self.replay_body,
is_completed = ?self.state.as_ref().map(|s| s.is_completed),
)
)]
fn is_end_stream(&self) -> bool {
// If the initial body was empty as soon as it was wrapped, then we are finished.
if self.shared.was_empty {
tracing::trace!("Initial body was empty, stream has ended");
return true;
}

let Some(state) = self.state.as_ref() else {
// This body is not currently the "active" replay being polled.
tracing::trace!("Inactive replay body is not complete");
return false;
};

// if this body has data or trailers remaining to play back, it
// is not EOS
!self.replay_body && !self.replay_trailers
let eos = !self.replay_body && !self.replay_trailers
// if we have replayed everything, the initial body may
// still have data remaining, so ask it
&& state.rest.is_end_stream()
&& state.rest.is_end_stream();
tracing::trace!(%eos, "Checked replay body end-of-stream");
eos
}

#[inline]
Expand Down Expand Up @@ -334,6 +357,32 @@ impl<B> Drop for ReplayBody<B> {
}
}

impl<B: Body> ReplayBody<B> {
/// Splits a `Frame<T>` into a chunk of data or a header map.
///
/// Frames do not expose their inner enums, and instead expose `into_data()` and
/// `into_trailers()` methods. This function breaks the frame into either `Some(Left(data))`
/// if it is given a DATA frame, and `Some(Right(trailers))` if it is given a TRAILERS frame.
///
/// This returns `None` if an unknown frame is provided, that is neither.
///
/// This is an internal helper to facilitate pattern matching in `read_body(..)`, above.
fn split_frame(
frame: crate::compat::Frame<B::Data>,
) -> Option<futures::future::Either<B::Data, HeaderMap>> {
use {crate::compat::Frame, futures::future::Either};
match frame.into_data().map_err(Frame::into_trailers) {
Ok(data) => Some(Either::Left(data)),
Err(Ok(trailers)) => Some(Either::Right(trailers)),
Err(Err(_unknown)) => {
// It's possible that some sort of unknown frame could be encountered.
tracing::warn!("An unknown body frame has been buffered");
None
}
}
}
}

// === impl BodyState ===

impl<B> BodyState<B> {
Expand Down
13 changes: 13 additions & 0 deletions linkerd/http/retry/src/replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ fn empty_body_is_always_eos() {
async fn eos_only_when_fully_replayed() {
// Test that each clone of a body is not EOS until the data has been
// fully replayed.
let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace");
let initial = ReplayBody::try_new(TestBody::one_data_frame(), 64 * 1024)
.expect("body must not be too large");
let replay = initial.clone();
Expand All @@ -344,6 +345,8 @@ async fn eos_only_when_fully_replayed() {
.expect("yields a frame")
.into_data()
.expect("yields a data frame");
// TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers.
assert!(initial.frame().await.is_none());
assert!(initial.is_end_stream());
assert!(!replay.is_end_stream());
drop(initial);
Expand Down Expand Up @@ -388,6 +391,7 @@ async fn eos_only_when_fully_replayed() {
async fn eos_only_when_fully_replayed_with_trailers() {
// Test that each clone of a body is not EOS until the data has been
// fully replayed.
let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace");
let initial = ReplayBody::try_new(TestBody::one_data_frame().with_trailers(), 64 * 1024)
.expect("body must not be too large");
let replay = initial.clone();
Expand Down Expand Up @@ -561,6 +565,7 @@ async fn caps_across_replays() {

#[test]
fn body_too_big() {
let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace");
let max_size = 8;
let mk_body = |sz: usize| -> BoxBody {
let s = (0..sz).map(|_| "x").collect::<String>();
Expand Down Expand Up @@ -597,6 +602,7 @@ fn body_too_big() {
#[allow(clippy::redundant_clone)]
#[test]
fn size_hint_is_correct_for_empty_body() {
let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace");
let initial =
ReplayBody::try_new(BoxBody::empty(), 64 * 1024).expect("empty body can't be too large");
let size = initial.size_hint();
Expand All @@ -617,6 +623,7 @@ async fn size_hint_is_correct_across_replays() {
debug_assert!(SIZE as usize <= CAPACITY);

// Create the initial body, and a replay.
let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace");
let mut initial = ReplayBody::try_new(BoxBody::from_static(BODY), CAPACITY)
.expect("empty body can't be too large");
let mut replay = initial.clone();
Expand All @@ -629,6 +636,12 @@ async fn size_hint_is_correct_across_replays() {

// Read the body, check the size hint again.
assert_eq!(chunk(&mut initial).await.as_deref(), Some(BODY));
let initial = {
// TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers.
let mut body = crate::compat::ForwardCompatibleBody::new(initial);
assert!(body.frame().await.is_none());
body.into_inner()
};
debug_assert!(initial.is_end_stream());
// TODO(kate): this currently misreports the *remaining* size of the body.
// let size = initial.size_hint();
Expand Down

0 comments on commit 4b53081

Please sign in to comment.