Skip to content

Commit f0f2fe1

Browse files
committed
Migrate to hyper 1.0
1 parent 89b971f commit f0f2fe1

File tree

15 files changed

+675
-143
lines changed

15 files changed

+675
-143
lines changed

Cargo.lock

Lines changed: 277 additions & 73 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ cookie = "^0.16.0"
1515
anyhow = "^1.0.27"
1616
futures = "^0.3.1"
1717
futures-timer = "^3.0"
18-
headers = "^0.3.2"
19-
hyper = { version = "0.14", features = ["full"] }
18+
headers = "^0.4"
19+
hyper = { version = "1.0", features = ["full"] }
20+
hyper-util = { version = "0.1.1", features = ["tokio", "server", "server-auto"] }
21+
http-body = "1.0.0"
22+
http-body-util = "0.1.0"
2023
itertools = "^0.10.0"
2124
lazy_static = "^1.4.0"
2225
mime = "^0.3.13"
2326
num_cpus = "^1.13.0"
27+
pin-project-lite = "0.2.7"
2428
rand = { version="^0.8", features = ["small_rng"]}
2529
serde = "^1.0.98"
2630
serde_derive = "^1.0.98"
2731
serde_urlencoded = "^0.7"
32+
sync_wrapper = "0.1.1"
2833
url = "^2.2.1"
29-
tokio = { version = "1.5.0", features = ["full"] }
34+
tokio = { version = "1.25.0", features = ["full"] }
3035
tower = { version = "^0.4.12", features = ["full"] }
31-
tower-http = { version = "^0.2.5", features=["trace"] }
36+
tower-http = { version = "^0.5", features=["trace"] }
3237
tracing = "0.1"
3338
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
3439
uri_path = { path = "uri_path" }

src/handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::http::{Request, Result};
22
use async_trait::async_trait;
3+
34
use std::future::Future;
45

56
#[async_trait]

src/http/body.rs

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
use futures::prelude::*;
2+
use http_body_util::BodyExt;
3+
use hyper::body::Bytes;
4+
use hyper::body::{Body as HttpBody, Frame, Incoming, SizeHint};
5+
use pin_project_lite::pin_project;
6+
use std::pin::Pin;
7+
use std::task::{Context, Poll};
8+
use std::{error::Error as StdError, fmt};
9+
use sync_wrapper::SyncWrapper;
10+
11+
type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
12+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
13+
14+
/// Errors that can happen when using axum.
15+
#[derive(Debug)]
16+
pub struct Error {
17+
inner: BoxError,
18+
}
19+
20+
impl Error {
21+
/// Create a new `Error` from a boxable error.
22+
pub fn new(error: impl Into<BoxError>) -> Self {
23+
Self {
24+
inner: error.into(),
25+
}
26+
}
27+
28+
/// Convert an `Error` back into the underlying boxed trait object.
29+
pub fn into_inner(self) -> BoxError {
30+
self.inner
31+
}
32+
}
33+
34+
impl fmt::Display for Error {
35+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36+
self.inner.fmt(f)
37+
}
38+
}
39+
40+
impl StdError for Error {
41+
fn source(&self) -> Option<&(dyn StdError + 'static)> {
42+
Some(&*self.inner)
43+
}
44+
}
45+
46+
fn boxed<B>(body: B) -> BoxBody
47+
where
48+
B: HttpBody<Data = Bytes> + Send + 'static,
49+
B::Error: Into<BoxError>,
50+
{
51+
try_downcast(body)
52+
.unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
53+
}
54+
55+
pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
56+
where
57+
T: 'static,
58+
K: Send + 'static,
59+
{
60+
let mut k = Some(k);
61+
if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
62+
Ok(k.take().unwrap())
63+
} else {
64+
Err(k.unwrap())
65+
}
66+
}
67+
68+
#[derive(Debug)]
69+
pub struct Body(BoxBody);
70+
71+
impl Body {
72+
/// Create a new `Body` that wraps another [`HttpBody`].
73+
pub fn new<B>(body: B) -> Self
74+
where
75+
B: HttpBody<Data = Bytes> + Send + 'static,
76+
B::Error: Into<BoxError>,
77+
{
78+
try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
79+
}
80+
81+
/// Create an empty body.
82+
pub fn empty() -> Self {
83+
Self::new(http_body_util::Empty::new())
84+
}
85+
86+
pub fn incoming(incoming: Incoming) -> Self {
87+
Self::new(incoming)
88+
}
89+
90+
/// Create a new `Body` from a [`Stream`].
91+
///
92+
/// [`Stream`]: futures::stream::Stream
93+
pub fn from_stream<S>(stream: S) -> Self
94+
where
95+
S: TryStream + Send + 'static,
96+
S::Ok: Into<Bytes>,
97+
S::Error: Into<BoxError>,
98+
{
99+
Self::new(StreamBody {
100+
stream: SyncWrapper::new(stream),
101+
})
102+
}
103+
}
104+
105+
impl Default for Body {
106+
fn default() -> Self {
107+
Self::empty()
108+
}
109+
}
110+
111+
macro_rules! body_from_impl {
112+
($ty:ty) => {
113+
impl From<$ty> for Body {
114+
fn from(buf: $ty) -> Self {
115+
Self::new(http_body_util::Full::from(buf))
116+
}
117+
}
118+
};
119+
}
120+
121+
body_from_impl!(&'static [u8]);
122+
body_from_impl!(std::borrow::Cow<'static, [u8]>);
123+
body_from_impl!(Vec<u8>);
124+
125+
body_from_impl!(&'static str);
126+
body_from_impl!(std::borrow::Cow<'static, str>);
127+
body_from_impl!(String);
128+
129+
body_from_impl!(Bytes);
130+
131+
impl HttpBody for Body {
132+
type Data = Bytes;
133+
type Error = Error;
134+
135+
#[inline]
136+
fn poll_frame(
137+
mut self: Pin<&mut Self>,
138+
cx: &mut Context<'_>,
139+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
140+
Pin::new(&mut self.0).poll_frame(cx)
141+
}
142+
143+
#[inline]
144+
fn size_hint(&self) -> SizeHint {
145+
self.0.size_hint()
146+
}
147+
148+
#[inline]
149+
fn is_end_stream(&self) -> bool {
150+
self.0.is_end_stream()
151+
}
152+
}
153+
154+
impl Stream for Body {
155+
type Item = Result<Bytes, Error>;
156+
157+
#[inline]
158+
fn poll_next(
159+
mut self: Pin<&mut Self>,
160+
cx: &mut Context<'_>,
161+
) -> Poll<Option<Self::Item>> {
162+
loop {
163+
match futures::ready!(self.as_mut().poll_frame(cx)?) {
164+
Some(frame) => match frame.into_data() {
165+
Ok(data) => return Poll::Ready(Some(Ok(data))),
166+
Err(_frame) => {}
167+
},
168+
None => return Poll::Ready(None),
169+
}
170+
}
171+
}
172+
}
173+
174+
pin_project! {
175+
struct StreamBody<S> {
176+
#[pin]
177+
stream: SyncWrapper<S>,
178+
}
179+
}
180+
181+
impl<S> HttpBody for StreamBody<S>
182+
where
183+
S: TryStream,
184+
S::Ok: Into<Bytes>,
185+
S::Error: Into<BoxError>,
186+
{
187+
type Data = Bytes;
188+
type Error = Error;
189+
190+
fn poll_frame(
191+
self: Pin<&mut Self>,
192+
cx: &mut Context<'_>,
193+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
194+
let stream = self.project().stream.get_pin_mut();
195+
match futures::ready!(stream.try_poll_next(cx)) {
196+
Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
197+
Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
198+
None => Poll::Ready(None),
199+
}
200+
}
201+
}
202+
203+
#[test]
204+
fn test_try_downcast() {
205+
assert_eq!(try_downcast::<i32, _>(5_u32), Err(5_u32));
206+
assert_eq!(try_downcast::<i32, _>(5_i32), Ok(5_i32));
207+
}

src/http/error.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,22 @@ impl From<hyper::http::Error> for Error {
2929
Error::HyperError(error)
3030
}
3131
}
32+
33+
// impl std::error::Error for Error {
34+
// fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
35+
// match self {
36+
// Self::HyperError(e) => Some(&e),
37+
// Self::Failure(resp) => None,
38+
// }
39+
// }
40+
// }
41+
42+
// impl std::fmt::Display for Error {
43+
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44+
// match self {
45+
// Self::HyperError(e) => e.fmt(f)?,
46+
// Self::Failure(resp) => {}
47+
// };
48+
// Ok(())
49+
// }
50+
// }

src/http/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
pub use hyper::body::{Bytes, Incoming};
12
pub use hyper::http::{StatusCode, Uri};
2-
pub use hyper::{body::Bytes, Body};
33

4+
mod body;
45
mod error;
56
mod request;
67
mod response;
78
mod stream;
89

10+
pub use self::body::Body;
911
pub use self::error::Error;
1012
pub use self::request::*;
1113
pub use self::response::*;

src/http/request.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
use super::Body;
12
use crate::headers::{Header, HeaderMapExt};
3+
// use hyper::body::Incoming;
24
use hyper::http::Request as HTTPRequest;
3-
use hyper::Body;
45
use std::net::SocketAddr;
56
use uri_path::PathMatch;
67

@@ -36,10 +37,6 @@ impl Request {
3637
de::deserialize(self.params.clone()).ok()
3738
}
3839

39-
pub fn body(&mut self) -> Body {
40-
std::mem::replace(self.req.body_mut(), Body::empty())
41-
}
42-
4340
pub fn typed_header<H: Header>(&self) -> Option<H> {
4441
self.req.headers().typed_get::<H>()
4542
}
@@ -63,3 +60,10 @@ impl core::ops::Deref for Request {
6360
&self.req
6461
}
6562
}
63+
64+
impl core::ops::DerefMut for Request {
65+
// Required method
66+
fn deref_mut(&mut self) -> &mut Self::Target {
67+
&mut self.req
68+
}
69+
}

src/http/response.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use super::{Body, Error, Result, StatusCode, Uri};
1+
use super::Body;
2+
use super::{Error, Result, StatusCode, Uri};
23
use crate::headers::{ContentType, Header, HeaderMapExt, Location};
34
use hyper::http::Response as HTTPResponse;
45

src/http/stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
use crate::http::Body;
12
use futures::prelude::*;
23
use hyper::body::Bytes;
3-
use hyper::Body;
44
use std::convert::Infallible;
55

66
pub(crate) fn ok_stream<T, S: Stream<Item = T>>(
@@ -13,7 +13,7 @@ pub(crate) fn body_from_stream<S: Stream + Send + Sync + 'static + Unpin>(
1313
stream: S,
1414
) -> Body
1515
where
16-
Bytes: From<<S as Stream>::Item>,
16+
S::Item: Into<Bytes>,
1717
{
18-
Body::wrap_stream(ok_stream(stream).into_stream())
18+
Body::from_stream(ok_stream(stream))
1919
}

0 commit comments

Comments
 (0)