Skip to content

Commit

Permalink
Migrate to hyper 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinastone committed Dec 15, 2023
1 parent 89b971f commit 1332c81
Show file tree
Hide file tree
Showing 15 changed files with 676 additions and 143 deletions.
350 changes: 277 additions & 73 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 10 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,29 @@ cookie = "^0.16.0"
anyhow = "^1.0.27"
futures = "^0.3.1"
futures-timer = "^3.0"
headers = "^0.3.2"
hyper = { version = "0.14", features = ["full"] }
headers = "^0.4"
hyper = { version = "1.0", features = ["full"] }
hyper-util = { version = "0.1.1", features = ["tokio", "server", "server-auto"] }
http-body = "1.0.0"
http-body-util = "0.1.0"
itertools = "^0.10.0"
lazy_static = "^1.4.0"
mime = "^0.3.13"
num_cpus = "^1.13.0"
pin-project-lite = "0.2.7"
rand = { version="^0.8", features = ["small_rng"]}
serde = "^1.0.98"
serde_derive = "^1.0.98"
serde_urlencoded = "^0.7"
sync_wrapper = "0.1.1"
url = "^2.2.1"
tokio = { version = "1.5.0", features = ["full"] }
tokio = { version = "1.25.0", features = ["full"] }
tower = { version = "^0.4.12", features = ["full"] }
tower-http = { version = "^0.2.5", features=["trace"] }
tower-http = { version = "^0.5", features=["trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uri_path = { path = "uri_path" }
# tower-hyper-http-body-compat = { version = "0.1.4", features = ["http1", "server"] }

[[bin]]
name = "httpbox"
Expand Down
1 change: 1 addition & 0 deletions src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::http::{Request, Result};
use async_trait::async_trait;

use std::future::Future;

#[async_trait]
Expand Down
207 changes: 207 additions & 0 deletions src/http/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
use futures::prelude::*;
use http_body_util::BodyExt;
use hyper::body::Bytes;
use hyper::body::{Body as HttpBody, Frame, Incoming, SizeHint};
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{error::Error as StdError, fmt};
use sync_wrapper::SyncWrapper;

type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// Errors that can happen when using axum.
#[derive(Debug)]
pub struct Error {
inner: BoxError,
}

impl Error {
/// Create a new `Error` from a boxable error.
pub fn new(error: impl Into<BoxError>) -> Self {
Self {
inner: error.into(),
}
}

/// Convert an `Error` back into the underlying boxed trait object.
pub fn into_inner(self) -> BoxError {
self.inner
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}

impl StdError for Error {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
Some(&*self.inner)
}
}

fn boxed<B>(body: B) -> BoxBody
where
B: HttpBody<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
try_downcast(body)
.unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
}

pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
where
T: 'static,
K: Send + 'static,
{
let mut k = Some(k);
if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
Ok(k.take().unwrap())
} else {
Err(k.unwrap())
}
}

#[derive(Debug)]
pub struct Body(BoxBody);

impl Body {
/// Create a new `Body` that wraps another [`HttpBody`].
pub fn new<B>(body: B) -> Self
where
B: HttpBody<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
}

/// Create an empty body.
pub fn empty() -> Self {
Self::new(http_body_util::Empty::new())
}

pub fn incoming(incoming: Incoming) -> Self {
Self::new(incoming)
}

/// Create a new `Body` from a [`Stream`].
///
/// [`Stream`]: futures::stream::Stream
pub fn from_stream<S>(stream: S) -> Self
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
Self::new(StreamBody {
stream: SyncWrapper::new(stream),
})
}
}

impl Default for Body {
fn default() -> Self {
Self::empty()
}
}

macro_rules! body_from_impl {
($ty:ty) => {
impl From<$ty> for Body {
fn from(buf: $ty) -> Self {
Self::new(http_body_util::Full::from(buf))
}
}
};
}

body_from_impl!(&'static [u8]);
body_from_impl!(std::borrow::Cow<'static, [u8]>);
body_from_impl!(Vec<u8>);

body_from_impl!(&'static str);
body_from_impl!(std::borrow::Cow<'static, str>);
body_from_impl!(String);

body_from_impl!(Bytes);

impl HttpBody for Body {
type Data = Bytes;
type Error = Error;

#[inline]
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.0).poll_frame(cx)
}

#[inline]
fn size_hint(&self) -> SizeHint {
self.0.size_hint()
}

#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
}

impl Stream for Body {
type Item = Result<Bytes, Error>;

#[inline]
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
match futures::ready!(self.as_mut().poll_frame(cx)?) {
Some(frame) => match frame.into_data() {
Ok(data) => return Poll::Ready(Some(Ok(data))),
Err(_frame) => {}
},
None => return Poll::Ready(None),
}
}
}
}

pin_project! {
struct StreamBody<S> {
#[pin]
stream: SyncWrapper<S>,
}
}

impl<S> HttpBody for StreamBody<S>
where
S: TryStream,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
type Data = Bytes;
type Error = Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let stream = self.project().stream.get_pin_mut();
match futures::ready!(stream.try_poll_next(cx)) {
Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
None => Poll::Ready(None),
}
}
}

#[test]
fn test_try_downcast() {
assert_eq!(try_downcast::<i32, _>(5_u32), Err(5_u32));
assert_eq!(try_downcast::<i32, _>(5_i32), Ok(5_i32));
}
19 changes: 19 additions & 0 deletions src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,22 @@ impl From<hyper::http::Error> for Error {
Error::HyperError(error)
}
}

// impl std::error::Error for Error {
// fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
// match self {
// Self::HyperError(e) => Some(&e),
// Self::Failure(resp) => None,
// }
// }
// }

// impl std::fmt::Display for Error {
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// match self {
// Self::HyperError(e) => e.fmt(f)?,
// Self::Failure(resp) => {}
// };
// Ok(())
// }
// }
4 changes: 3 additions & 1 deletion src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
pub use hyper::body::{Bytes, Incoming};

Check warning on line 1 in src/http/mod.rs

View workflow job for this annotation

GitHub Actions / build (beta)

unused import: `Incoming`

Check warning on line 1 in src/http/mod.rs

View workflow job for this annotation

GitHub Actions / build (beta)

unused import: `Incoming`
pub use hyper::http::{StatusCode, Uri};
pub use hyper::{body::Bytes, Body};

mod body;
mod error;
mod request;
mod response;
mod stream;

pub use self::body::Body;
pub use self::error::Error;
pub use self::request::*;
pub use self::response::*;
Expand Down
14 changes: 9 additions & 5 deletions src/http/request.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::Body;
use crate::headers::{Header, HeaderMapExt};
// use hyper::body::Incoming;
use hyper::http::Request as HTTPRequest;
use hyper::Body;
use std::net::SocketAddr;
use uri_path::PathMatch;

Expand Down Expand Up @@ -36,10 +37,6 @@ impl Request {
de::deserialize(self.params.clone()).ok()
}

pub fn body(&mut self) -> Body {
std::mem::replace(self.req.body_mut(), Body::empty())
}

pub fn typed_header<H: Header>(&self) -> Option<H> {
self.req.headers().typed_get::<H>()
}
Expand All @@ -63,3 +60,10 @@ impl core::ops::Deref for Request {
&self.req
}
}

impl core::ops::DerefMut for Request {
// Required method
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.req
}
}
3 changes: 2 additions & 1 deletion src/http/response.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{Body, Error, Result, StatusCode, Uri};
use super::Body;
use super::{Error, Result, StatusCode, Uri};
use crate::headers::{ContentType, Header, HeaderMapExt, Location};
use hyper::http::Response as HTTPResponse;

Expand Down
6 changes: 3 additions & 3 deletions src/http/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::http::Body;
use futures::prelude::*;
use hyper::body::Bytes;
use hyper::Body;
use std::convert::Infallible;

pub(crate) fn ok_stream<T, S: Stream<Item = T>>(
Expand All @@ -13,7 +13,7 @@ pub(crate) fn body_from_stream<S: Stream + Send + Sync + 'static + Unpin>(
stream: S,
) -> Body
where
Bytes: From<<S as Stream>::Item>,
S::Item: Into<Bytes>,
{
Body::wrap_stream(ok_stream(stream).into_stream())
Body::from_stream(ok_stream(stream))
}
Loading

0 comments on commit 1332c81

Please sign in to comment.