Skip to content

Commit

Permalink
backport Collect
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 1, 2023
1 parent 6f722a1 commit 86e7beb
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 0 deletions.
223 changes: 223 additions & 0 deletions src/collect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
task::{Context, Poll},
};

use super::Body;

use bytes::{Buf, Bytes};
use http::HeaderMap;
use pin_project_lite::pin_project;

pin_project! {
/// Future that resolves into a [`Collected`].
pub struct Collect<T>
where
T: Body,
{
#[pin]
body: T,
collected: Option<Collected<T::Data>>,
is_data_done: bool,
}
}

impl<T: Body> Collect<T> {
pub(crate) fn new(body: T) -> Self {
Self {
body,
collected: Some(Collected::default()),
is_data_done: false,
}
}
}

impl<T: Body> Future for Collect<T> {
type Output = Result<Collected<T::Data>, T::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut me = self.project();

loop {
if !*me.is_data_done {
match me.body.as_mut().poll_data(cx) {
Poll::Ready(Some(Ok(data))) => {
me.collected.as_mut().unwrap().push_data(data);
},
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(Err(err));
},
Poll::Ready(None) => {
*me.is_data_done = true;
},
Poll::Pending => return Poll::Pending,
}
} else {
match me.body.as_mut().poll_trailers(cx) {
Poll::Ready(Ok(Some(trailers))) => {
me.collected.as_mut().unwrap().push_trailers(trailers);
break;
},
Poll::Ready(Err(err)) => {
return Poll::Ready(Err(err));
},
Poll::Ready(Ok(None)) => break,
Poll::Pending => return Poll::Pending,
}
}
}

Poll::Ready(Ok(me.collected.take().expect("polled after complete")))
}
}


/// A collected body produced by [`Body::collect`] which collects all the DATA frames
/// and trailers.
#[derive(Debug)]
pub struct Collected<B> {
bufs: BufList<B>,
trailers: Option<HeaderMap>,
}

impl<B: Buf> Collected<B> {
/// If there is a trailers frame buffered, returns a reference to it.
///
/// Returns `None` if the body contained no trailers.
pub fn trailers(&self) -> Option<&HeaderMap> {
self.trailers.as_ref()
}

/// Aggregate this buffered into a [`Buf`].
pub fn aggregate(self) -> impl Buf {
self.bufs
}

/// Convert this body into a [`Bytes`].
pub fn to_bytes(mut self) -> Bytes {
self.bufs.copy_to_bytes(self.bufs.remaining())
}

fn push_data(&mut self, data: B) {
// Only push this frame if it has some data in it, to avoid crashing on
// `BufList::push`.
if data.has_remaining() {
self.bufs.push(data);
}
}

fn push_trailers(&mut self, trailers: HeaderMap) {
if let Some(current) = &mut self.trailers {
current.extend(trailers);
} else {
self.trailers = Some(trailers);
}
}
}

impl<B> Default for Collected<B> {
fn default() -> Self {
Self {
bufs: BufList::default(),
trailers: None,
}
}
}

impl<B> Unpin for Collected<B> {}

#[derive(Debug)]
struct BufList<T> {
bufs: VecDeque<T>,
}

impl<T: Buf> BufList<T> {
#[inline]
pub(crate) fn push(&mut self, buf: T) {
debug_assert!(buf.has_remaining());
self.bufs.push_back(buf);
}

/*
#[inline]
pub(crate) fn pop(&mut self) -> Option<T> {
self.bufs.pop_front()
}
*/
}

impl<T: Buf> Buf for BufList<T> {
#[inline]
fn remaining(&self) -> usize {
self.bufs.iter().map(|buf| buf.remaining()).sum()
}

#[inline]
fn chunk(&self) -> &[u8] {
self.bufs.front().map(Buf::chunk).unwrap_or_default()
}

#[inline]
fn advance(&mut self, mut cnt: usize) {
while cnt > 0 {
{
let front = &mut self.bufs[0];
let rem = front.remaining();
if rem > cnt {
front.advance(cnt);
return;
} else {
front.advance(rem);
cnt -= rem;
}
}
self.bufs.pop_front();
}
}

#[inline]
fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize {
if dst.is_empty() {
return 0;
}
let mut vecs = 0;
for buf in &self.bufs {
vecs += buf.chunks_vectored(&mut dst[vecs..]);
if vecs == dst.len() {
break;
}
}
vecs
}

#[inline]
fn copy_to_bytes(&mut self, len: usize) -> Bytes {
use bytes::{BufMut, BytesMut};
// Our inner buffer may have an optimized version of copy_to_bytes, and if the whole
// request can be fulfilled by the front buffer, we can take advantage.
match self.bufs.front_mut() {
Some(front) if front.remaining() == len => {
let b = front.copy_to_bytes(len);
self.bufs.pop_front();
b
}
Some(front) if front.remaining() > len => front.copy_to_bytes(len),
_ => {
assert!(len <= self.remaining(), "`len` greater than remaining");
let mut bm = BytesMut::with_capacity(len);
bm.put(self.take(len));
bm.freeze()
}
}
}
}

impl<T> Default for BufList<T> {
fn default() -> Self {
BufList {
bufs: VecDeque::new(),
}
}
}
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//!
//! [`Body`]: trait.Body.html
mod collect;
mod empty;
mod full;
mod limited;
Expand All @@ -21,6 +22,7 @@ mod size_hint;

pub mod combinators;

pub use self::collect::Collected;
pub use self::empty::Empty;
pub use self::full::Full;
pub use self::limited::{LengthLimitError, Limited};
Expand Down Expand Up @@ -118,6 +120,16 @@ pub trait Body {
MapErr::new(self, f)
}


/// Turn this body into [`Collected`] body which will collect all the DATA frames
/// and trailers.
fn collect(self) -> crate::collect::Collect<Self>
where
Self: Sized,
{
collect::Collect::new(self)
}

/// Turn this body into a boxed trait object.
fn boxed(self) -> BoxBody<Self::Data, Self::Error>
where
Expand Down

0 comments on commit 86e7beb

Please sign in to comment.