diff --git a/linkerd/http/retry/src/compat.rs b/linkerd/http/retry/src/compat.rs index 79698d0797..a4e452d6be 100644 --- a/linkerd/http/retry/src/compat.rs +++ b/linkerd/http/retry/src/compat.rs @@ -1,6 +1,11 @@ //! Compatibility utilities for upgrading to http-body 1.0. -use http_body::Body; +use http_body::{Body, SizeHint}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; pub(crate) use self::frame::Frame; @@ -42,10 +47,25 @@ impl ForwardCompatibleBody { } /// Returns `true` when the end of stream has been reached. - #[cfg(test)] pub(crate) fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } + + /// Returns the bounds on the remaining length of the stream. + pub(crate) fn size_hint(&self) -> SizeHint { + self.inner.size_hint() + } +} + +impl ForwardCompatibleBody { + pub(crate) fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, B::Error>>> { + let mut fut = self.get_mut().frame(); + let pinned = Pin::new(&mut fut); + pinned.poll(cx) + } } /// Future that resolves to the next frame from a `Body`. diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index 8a931ddf65..03804feab0 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -68,7 +68,7 @@ struct SharedState { struct BodyState { replay: Replay, trailers: Option, - rest: B, + rest: crate::compat::ForwardCompatibleBody, is_completed: bool, /// Maximum number of bytes to buffer. @@ -104,13 +104,19 @@ impl ReplayBody { state: Some(BodyState { replay: Default::default(), trailers: None, - rest: body, + rest: crate::compat::ForwardCompatibleBody::new(body), is_completed: false, max_bytes: max_bytes + 1, }), - // The initial `ReplayBody` has nothing to replay + // The initial `ReplayBody` has no data to replay. replay_body: false, - replay_trailers: false, + // NOTE(kate): When polling the inner body in terms of frames, we will not yield + // `Ready(None)` from `Body::poll_data()` until we have reached the end of the + // underlying stream. Once we have migrated to `http-body` v1, this field will be + // initialized `false` thanks to the use of `Body::poll_frame()`, but for now we must + // initialize this to true; `poll_trailers()` will be called after the trailers have + // been observed previously, even for the initial body. + replay_trailers: true, }) } @@ -204,16 +210,33 @@ where // Poll the inner body for more data. If the body has ended, remember // that so that future clones will not try polling it again (as // described above). - let data = { + let data: B::Data = { + use futures::{future::Either, ready}; + // Poll the inner body for the next frame. tracing::trace!("Polling initial body"); - match futures::ready!(Pin::new(&mut state.rest).poll_data(cx)) { - Some(Ok(data)) => data, - Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + let poll = Pin::new(&mut state.rest).poll_frame(cx).map_err(Into::into); + let frame = match ready!(poll) { + // The body yielded a new frame. + Some(Ok(frame)) => frame, + // The body yielded an error. + Some(Err(error)) => return Poll::Ready(Some(Err(error))), + // The body has reached the end of the stream. None => { tracing::trace!("Initial body completed"); state.is_completed = true; return Poll::Ready(None); } + }; + // Now, inspect the frame: was it a chunk of data, or a trailers frame? + match Self::split_frame(frame) { + Some(Either::Left(data)) => data, + Some(Either::Right(trailers)) => { + tracing::trace!("Initial body completed"); + state.trailers = Some(trailers); + state.is_completed = true; + return Poll::Ready(None); + } + None => return Poll::Ready(None), } }; @@ -234,7 +257,7 @@ where /// NOT be polled until the previous body has been dropped. fn poll_trailers( self: Pin<&mut Self>, - cx: &mut Context<'_>, + _cx: &mut Context<'_>, ) -> Poll, Self::Error>> { let this = self.get_mut(); let state = Self::acquire_state(&mut this.state, &this.shared.body); @@ -251,40 +274,40 @@ where } } - // If the inner body has previously ended, don't poll it again. - if !state.rest.is_end_stream() { - return Pin::new(&mut state.rest) - .poll_trailers(cx) - .map_ok(|tlrs| { - // Record a copy of the inner body's trailers in the shared state. - if state.trailers.is_none() { - state.trailers.clone_from(&tlrs); - } - tlrs - }) - .map_err(Into::into); - } - Poll::Ready(Ok(None)) } + #[tracing::instrument( + skip_all, + level = "trace", + fields( + state.is_some = %self.state.is_some(), + replay_trailers = %self.replay_trailers, + replay_body = %self.replay_body, + is_completed = ?self.state.as_ref().map(|s| s.is_completed), + ) + )] fn is_end_stream(&self) -> bool { // If the initial body was empty as soon as it was wrapped, then we are finished. if self.shared.was_empty { + tracing::trace!("Initial body was empty, stream has ended"); return true; } let Some(state) = self.state.as_ref() else { // This body is not currently the "active" replay being polled. + tracing::trace!("Inactive replay body is not complete"); return false; }; // if this body has data or trailers remaining to play back, it // is not EOS - !self.replay_body && !self.replay_trailers + let eos = !self.replay_body && !self.replay_trailers // if we have replayed everything, the initial body may // still have data remaining, so ask it - && state.rest.is_end_stream() + && state.rest.is_end_stream(); + tracing::trace!(%eos, "Checked replay body end-of-stream"); + eos } #[inline] @@ -334,6 +357,32 @@ impl Drop for ReplayBody { } } +impl ReplayBody { + /// Splits a `Frame` into a chunk of data or a header map. + /// + /// Frames do not expose their inner enums, and instead expose `into_data()` and + /// `into_trailers()` methods. This function breaks the frame into either `Some(Left(data))` + /// if it is given a DATA frame, and `Some(Right(trailers))` if it is given a TRAILERS frame. + /// + /// This returns `None` if an unknown frame is provided, that is neither. + /// + /// This is an internal helper to facilitate pattern matching in `read_body(..)`, above. + fn split_frame( + frame: crate::compat::Frame, + ) -> Option> { + use {crate::compat::Frame, futures::future::Either}; + match frame.into_data().map_err(Frame::into_trailers) { + Ok(data) => Some(Either::Left(data)), + Err(Ok(trailers)) => Some(Either::Right(trailers)), + Err(Err(_unknown)) => { + // It's possible that some sort of unknown frame could be encountered. + tracing::warn!("An unknown body frame has been buffered"); + None + } + } + } +} + // === impl BodyState === impl BodyState { diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index 90c0140188..67640ac23f 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -327,6 +327,7 @@ fn empty_body_is_always_eos() { async fn eos_only_when_fully_replayed() { // Test that each clone of a body is not EOS until the data has been // fully replayed. + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let initial = ReplayBody::try_new(TestBody::one_data_frame(), 64 * 1024) .expect("body must not be too large"); let replay = initial.clone(); @@ -344,6 +345,8 @@ async fn eos_only_when_fully_replayed() { .expect("yields a frame") .into_data() .expect("yields a data frame"); + // TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers. + assert!(initial.frame().await.is_none()); assert!(initial.is_end_stream()); assert!(!replay.is_end_stream()); drop(initial); @@ -388,6 +391,7 @@ async fn eos_only_when_fully_replayed() { async fn eos_only_when_fully_replayed_with_trailers() { // Test that each clone of a body is not EOS until the data has been // fully replayed. + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let initial = ReplayBody::try_new(TestBody::one_data_frame().with_trailers(), 64 * 1024) .expect("body must not be too large"); let replay = initial.clone(); @@ -561,6 +565,7 @@ async fn caps_across_replays() { #[test] fn body_too_big() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let max_size = 8; let mk_body = |sz: usize| -> BoxBody { let s = (0..sz).map(|_| "x").collect::(); @@ -597,6 +602,7 @@ fn body_too_big() { #[allow(clippy::redundant_clone)] #[test] fn size_hint_is_correct_for_empty_body() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let initial = ReplayBody::try_new(BoxBody::empty(), 64 * 1024).expect("empty body can't be too large"); let size = initial.size_hint(); @@ -617,6 +623,7 @@ async fn size_hint_is_correct_across_replays() { debug_assert!(SIZE as usize <= CAPACITY); // Create the initial body, and a replay. + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let mut initial = ReplayBody::try_new(BoxBody::from_static(BODY), CAPACITY) .expect("empty body can't be too large"); let mut replay = initial.clone(); @@ -629,6 +636,12 @@ async fn size_hint_is_correct_across_replays() { // Read the body, check the size hint again. assert_eq!(chunk(&mut initial).await.as_deref(), Some(BODY)); + let initial = { + // TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers. + let mut body = crate::compat::ForwardCompatibleBody::new(initial); + assert!(body.frame().await.is_none()); + body.into_inner() + }; debug_assert!(initial.is_end_stream()); // TODO(kate): this currently misreports the *remaining* size of the body. // let size = initial.size_hint();