Skip to content

Commit

Permalink
wip: http3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
SergioBenitez committed Feb 21, 2024
1 parent a866134 commit b2378ab
Show file tree
Hide file tree
Showing 15 changed files with 406 additions and 110 deletions.
14 changes: 12 additions & 2 deletions core/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ all-features = true
[features]
default = ["http2", "tokio-macros"]
http2 = ["hyper/http2", "hyper-util/http2"]
http3 = ["s2n-quic", "s2n-quic-h3", "tls"]
secrets = ["cookie/private", "cookie/key-expansion"]
json = ["serde_json"]
msgpack = ["rmp-serde"]
Expand Down Expand Up @@ -76,8 +77,7 @@ futures = { version = "0.3.30", default-features = false, features = ["std"] }
state = "0.6"

[dependencies.hyper-util]
git = "https://github.com/SergioBenitez/hyper-util.git"
branch = "fix-readversion"
version = "0.1.3"
default-features = false
features = ["http1", "server", "tokio"]

Expand All @@ -99,6 +99,16 @@ version = "0.6.0-dev"
path = "../http"
features = ["serde"]

[dependencies.s2n-quic]
version = "1.32"
default-features = false
features = ["provider-address-token-default", "provider-tls-rustls"]
optional = true

[dependencies.s2n-quic-h3]
git = "https://github.com/SergioBenitez/s2n-quic-h3.git"
optional = true

[target.'cfg(unix)'.dependencies]
libc = "0.2.149"

Expand Down
36 changes: 31 additions & 5 deletions core/lib/src/data/data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::task::{Context, Poll};
use std::path::Path;
use std::io::{self, Cursor};

use futures::ready;
use futures::{ready, FutureExt};
use futures::stream::Stream;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, ReadBuf, Take};
Expand Down Expand Up @@ -65,10 +65,15 @@ pub type BaseReader<'r> = Take<Chain<Cursor<Vec<u8>>, RawReader<'r>>>;
/// Direct reader to the underlying data stream. Not limited in any manner.
pub type RawReader<'r> = StreamReader<RawStream<'r>, Bytes>;

#[cfg(feature = "http3")]
use s2n_quic_h3::{self as quic, h3};

/// Raw underlying data stream.
pub enum RawStream<'r> {
Empty,
Body(&'r mut HyperBody),
Body(HyperBody),
#[cfg(feature = "http3")]
H3Body(h3::server::RequestStream<quic::RecvStream, Bytes>),
Multipart(multer::Field<'r>),
}

Expand Down Expand Up @@ -343,7 +348,17 @@ impl Stream for RawStream<'_> {
.poll_frame(cx)
.map_ok(|frame| frame.into_data().unwrap_or_else(|_| Bytes::new()))
.map_err(io::Error::other)
}
},
#[cfg(feature = "http3")]
RawStream::H3Body(stream) => {
use bytes::Buf;

match ready!(stream.poll_recv_data(cx)) {
Ok(Some(mut buf)) => Poll::Ready(Some(Ok(buf.copy_to_bytes(buf.remaining())))),
Ok(None) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(io::Error::other(e)))),
}
},
RawStream::Multipart(s) => Pin::new(s).poll_next(cx).map_err(io::Error::other),
RawStream::Empty => Poll::Ready(None),
}
Expand All @@ -356,6 +371,8 @@ impl Stream for RawStream<'_> {
let (lower, upper) = (hint.lower(), hint.upper());
(lower as usize, upper.map(|x| x as usize))
},
#[cfg(feature = "http3")]
RawStream::H3Body(_) => (0, Some(0)),
RawStream::Multipart(mp) => mp.size_hint(),
RawStream::Empty => (0, Some(0)),
}
Expand All @@ -367,17 +384,26 @@ impl std::fmt::Display for RawStream<'_> {
match self {
RawStream::Empty => f.write_str("empty stream"),
RawStream::Body(_) => f.write_str("request body"),
#[cfg(feature = "http3")]
RawStream::H3Body(_) => f.write_str("http3 quic stream"),
RawStream::Multipart(_) => f.write_str("multipart form field"),
}
}
}

impl<'r> From<&'r mut HyperBody> for RawStream<'r> {
fn from(value: &'r mut HyperBody) -> Self {
impl<'r> From<HyperBody> for RawStream<'r> {
fn from(value: HyperBody) -> Self {
Self::Body(value)
}
}

#[cfg(feature = "http3")]
impl<'r> From<h3::server::RequestStream<quic::RecvStream, Bytes>> for RawStream<'r> {
fn from(value: h3::server::RequestStream<quic::RecvStream, Bytes>) -> Self {
Self::H3Body(value)
}
}

impl<'r> From<multer::Field<'r>> for RawStream<'r> {
fn from(value: multer::Field<'r>) -> Self {
Self::Multipart(value)
Expand Down
2 changes: 2 additions & 0 deletions core/lib/src/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ pub use self::capped::{N, Capped};
pub use self::io_stream::{IoHandler, IoStream};
pub use ubyte::{ByteUnit, ToByteUnit};
pub use self::transform::{Transform, TransformBuf};

pub(crate) use self::data_stream::RawStream;
20 changes: 7 additions & 13 deletions core/lib/src/erased.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::task::{Poll, Context};

use futures::future::BoxFuture;
use http::request::Parts;
use hyper::body::Incoming;
use tokio::io::{AsyncRead, ReadBuf};

use crate::data::{Data, IoHandler};
Expand All @@ -16,9 +15,9 @@ use crate::{Request, Response, Rocket, Orbit};
// TODO: Write safety proofs.

macro_rules! static_assert_covariance {
($T:tt) => (
($($T:tt)*) => (
const _: () = {
fn _assert_covariance<'x: 'y, 'y>(x: &'y $T<'x>) -> &'y $T<'y> { x }
fn _assert_covariance<'x: 'y, 'y>(x: &'y $($T)*<'x>) -> &'y $($T)*<'y> { x }
};
)
}
Expand All @@ -40,7 +39,6 @@ pub struct ErasedResponse {
// XXX: SAFETY: This (dependent) field must come first due to drop order!
response: Response<'static>,
_request: Arc<ErasedRequest>,
_incoming: Box<Incoming>,
}

impl Drop for ErasedResponse {
Expand Down Expand Up @@ -81,8 +79,7 @@ impl ErasedRequest {

pub async fn into_response<T: Send + Sync + 'static>(
self,
incoming: Incoming,
data_builder: impl for<'r> FnOnce(&'r mut Incoming) -> Data<'r>,
data: Data<'static>,
preprocess: impl for<'r, 'x> FnOnce(
&'r Rocket<Orbit>,
&'r mut Request<'x>,
Expand All @@ -95,13 +92,11 @@ impl ErasedRequest {
Data<'r>
) -> BoxFuture<'r, Response<'r>>,
) -> ErasedResponse {
let mut incoming = Box::new(incoming);
let mut data: Data<'_> = {
let incoming: &mut Incoming = &mut *incoming;
let incoming: &'static mut Incoming = unsafe { transmute(incoming) };
data_builder(incoming)
};
// FIXME: UNSAFE. This is incorrect. The following fail:
// static_assert_covariance!(Data);
// static_assert_covariance!(crate::data::RawStream);

let mut data: Data<'_> = data;
let mut parent = Arc::new(self);
let token: T = {
let parent: &mut ErasedRequest = Arc::get_mut(&mut parent).unwrap();
Expand All @@ -122,7 +117,6 @@ impl ErasedRequest {

ErasedResponse {
_request: parent,
_incoming: incoming,
response: response,
}
}
Expand Down
43 changes: 32 additions & 11 deletions core/lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::error::Error as StdError;
use yansi::Paint;
use figment::Profile;

use crate::{Rocket, Orbit};
use crate::{Ignite, Orbit, Rocket};

/// An error that occurs during launch.
///
Expand Down Expand Up @@ -89,6 +89,11 @@ pub enum ErrorKind {
SentinelAborts(Vec<crate::sentinel::Sentry>),
/// The configuration profile is not debug but no secret key is configured.
InsecureSecretKey(Profile),
/// Liftoff failed. Contains the Rocket instance that failed to shutdown.
Liftoff(
Result<Rocket<Ignite>, Arc<Rocket<Orbit>>>,
Box<dyn StdError + Send + Sync + 'static>
),
/// Shutdown failed. Contains the Rocket instance that failed to shutdown.
Shutdown(Arc<Rocket<Orbit>>),
}
Expand Down Expand Up @@ -121,6 +126,12 @@ impl Error {
Error { handled: AtomicBool::new(false), kind }
}

pub(crate) fn io_other<E>(e: E) -> Error
where E: Into<Box<dyn StdError + Send + Sync>>
{
Error::from(io::Error::other(e))
}

#[inline(always)]
fn was_handled(&self) -> bool {
self.handled.load(Ordering::Acquire)
Expand Down Expand Up @@ -225,6 +236,11 @@ impl Error {

"aborting due to sentinel-triggered abort(s)"
}
ErrorKind::Liftoff(_, error) => {
error!("Rocket liftoff faield due to panicking liftoff fairing(s).");
error_!("{error}");
"aborting due to failed liftoff"
}
ErrorKind::Shutdown(_) => {
error!("Rocket failed to shutdown gracefully.");
"aborting due to failed shutdown"
Expand All @@ -246,6 +262,7 @@ impl fmt::Display for ErrorKind {
ErrorKind::InsecureSecretKey(_) => "insecure secret key config".fmt(f),
ErrorKind::Config(_) => "failed to extract configuration".fmt(f),
ErrorKind::SentinelAborts(_) => "sentinel(s) aborted".fmt(f),
ErrorKind::Liftoff(_, _) => "liftoff failed".fmt(f),
ErrorKind::Shutdown(_) => "shutdown failed".fmt(f),
}
}
Expand Down Expand Up @@ -300,33 +317,37 @@ pub(crate) fn log_server_error(error: &Box<dyn StdError + Send + Sync>) {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let error = &self.0;
if let Some(e) = error.downcast_ref::<hyper::Error>() {
write!(f, "request processing failed: {e}")?;
write!(f, "request failed: {e}")?;
} else if let Some(e) = error.downcast_ref::<io::Error>() {
write!(f, "connection I/O error: ")?;
write!(f, "connection error: ")?;

match e.kind() {
io::ErrorKind::NotConnected => write!(f, "remote disconnected")?,
io::ErrorKind::UnexpectedEof => write!(f, "remote sent early eof")?,
io::ErrorKind::ConnectionReset
| io::ErrorKind::ConnectionAborted
| io::ErrorKind::BrokenPipe => write!(f, "terminated by remote")?,
| io::ErrorKind::ConnectionAborted => write!(f, "terminated by remote")?,
_ => write!(f, "{e}")?,
}
} else {
write!(f, "http server error: {error}")?;
}

if let Some(e) = error.source() {
write!(f, " ({})", ServerError(e))?;
}

Ok(())
}
}

let mut error: &(dyn StdError + 'static) = &**error;
if error.downcast_ref::<hyper::Error>().is_some() {
warn!("{}", ServerError(&**error))
warn!("{}", ServerError(error));
while let Some(source) = error.source() {
error = source;
warn_!("{}", ServerError(error));
}
} else {
error!("{}", ServerError(&**error))
error!("{}", ServerError(error));
while let Some(source) = error.source() {
error = source;
error_!("{}", ServerError(error));
}
}
}
33 changes: 22 additions & 11 deletions core/lib/src/listener/cancellable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pin_project! {
}
}

#[derive(Debug)]
enum State {
/// I/O has not been cancelled. Proceed as normal.
Active,
Expand Down Expand Up @@ -97,11 +98,11 @@ pub trait CancellableExt: Sized {
impl<L: Listener> CancellableExt for L { }

fn time_out() -> io::Error {
io::Error::new(io::ErrorKind::TimedOut, "Shutdown grace timed out")
io::Error::new(io::ErrorKind::TimedOut, "shutdown grace period elapsed")
}

fn gone() -> io::Error {
io::Error::new(io::ErrorKind::BrokenPipe, "IO driver has terminated")
io::Error::new(io::ErrorKind::BrokenPipe, "I/O driver terminated")
}

impl<L, F> CancellableListener<F, Bounced<L>>
Expand Down Expand Up @@ -162,7 +163,7 @@ impl<L, F> Listener for CancellableListener<F, L>
}

impl<F: Future, I: AsyncWrite> CancellableIo<F, I> {
fn inner(&self) -> Option<&I> {
pub fn inner(&self) -> Option<&I> {
self.io.as_ref()
}

Expand All @@ -171,7 +172,7 @@ impl<F: Future, I: AsyncWrite> CancellableIo<F, I> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
do_io: impl FnOnce(Pin<&mut I>, &mut Context<'_>) -> Poll<io::Result<T>>,
) -> Poll<io::Result<T>> {
) -> Poll<io::Result<Option<T>>> {
let mut me = self.as_mut().project();
let io = match me.io.as_pin_mut() {
Some(io) => io,
Expand All @@ -184,14 +185,14 @@ impl<F: Future, I: AsyncWrite> CancellableIo<F, I> {
if me.trigger.as_mut().poll(cx).is_ready() {
*me.state = State::Grace(Box::pin(sleep(*me.grace)));
} else {
return do_io(io, cx);
return do_io(io, cx).map_ok(Some);
}
}
State::Grace(timer) => {
if timer.as_mut().poll(cx).is_ready() {
*me.state = State::Mercy(Box::pin(sleep(*me.mercy)));
} else {
return do_io(io, cx);
return do_io(io, cx).map_ok(Some);
}
}
State::Mercy(timer) => {
Expand All @@ -218,7 +219,9 @@ impl<F: Future, I: AsyncRead + AsyncWrite> AsyncRead for CancellableIo<F, I> {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.as_mut().poll_trigger_then(cx, |io, cx| io.poll_read(cx, buf))
self.as_mut()
.poll_trigger_then(cx, |io, cx| io.poll_read(cx, buf))
.map_ok(|ok| ok.unwrap_or_default())
}
}

Expand All @@ -228,29 +231,37 @@ impl<F: Future, I: AsyncWrite> AsyncWrite for CancellableIo<F, I> {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.as_mut().poll_trigger_then(cx, |io, cx| io.poll_write(cx, buf))
self.as_mut()
.poll_trigger_then(cx, |io, cx| io.poll_write(cx, buf))
.map_ok(|ok| ok.unwrap_or_default())
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<io::Result<()>> {
self.as_mut().poll_trigger_then(cx, |io, cx| io.poll_flush(cx))
self.as_mut()
.poll_trigger_then(cx, |io, cx| io.poll_flush(cx))
.map_ok(|ok| ok.unwrap_or_default())
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<io::Result<()>> {
self.as_mut().poll_trigger_then(cx, |io, cx| io.poll_shutdown(cx))
self.as_mut()
.poll_trigger_then(cx, |io, cx| io.poll_shutdown(cx))
.map_ok(|ok| ok.unwrap_or_default())
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.as_mut().poll_trigger_then(cx, |io, cx| io.poll_write_vectored(cx, bufs))
self.as_mut()
.poll_trigger_then(cx, |io, cx| io.poll_write_vectored(cx, bufs))
.map_ok(|ok| ok.unwrap_or_default())
}

fn is_write_vectored(&self) -> bool {
Expand Down
Loading

0 comments on commit b2378ab

Please sign in to comment.