Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into server-ffi
Browse files Browse the repository at this point in the history
  • Loading branch information
bossmc committed Oct 22, 2024
2 parents ca2aaf6 + c68d424 commit 89b23fc
Show file tree
Hide file tree
Showing 18 changed files with 267 additions and 32 deletions.
1 change: 1 addition & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ jobs:
- name: Pin some dependencies
run: |
cargo update -p tokio --precise 1.38.1
cargo update -p tokio-util --precise 0.7.11
- name: Check
run: cargo check --features full
Expand Down
22 changes: 20 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
## v1.5.0 (2024-10-15)


#### Bug Fixes

* **http1:**
* improve performance of parsing sequentially partial messages (#3764) ([3900a23](https://github.com/hyperium/hyper/commit/3900a2381b96a7e7f608a5e031b3e90ddcdfcd74))
* send 'connection: close' when connection is ending (#3725) ([c86a6bcb](https://github.com/hyperium/hyper/commit/c86a6bcb4acb0f92e731ea2e4c1e4a839248a600), closes [#3720](https://github.com/hyperium/hyper/issues/3720))
* make `date_header` effective (#3718) ([7de02373](https://github.com/hyperium/hyper/commit/7de02373f5e4ce392587a4d9d7710c6faf9c6165))
* **http2:** strip content-length header in response to CONNECT requests (#3748) ([67a4a498](https://github.com/hyperium/hyper/commit/67a4a498d8bbdce4e604bc578da4693fb048f83d))


#### Features

* **client:** Add HTTP/2 builder options `header_table_size()` and `max_concurrent_streams()` ([4c84e8c1](https://github.com/hyperium/hyper/commit/4c84e8c1c26a1464221de96b9f39816ce7251a5f))
* **rt:** add `ReadBufCursor` methods `remaining()` and `put_slice()` (#3700) ([5a13041e](https://github.com/hyperium/hyper/commit/5a13041ed7033c9dab6e2adafd08b6af20cd33fb))


### v1.4.1 (2024-07-09)


Expand Down Expand Up @@ -212,7 +230,7 @@ Be sure to check out the [upgrading guide](https://hyper.rs/guides/1/upgrading).

#### Breaking Changes

* Any IO transport type provided must not implement `hyper::rt::{Read, Write}` instead of
* Any IO transport type provided must now implement `hyper::rt::{Read, Write}` instead of
`tokio::io` traits. You can grab a helper type from `hyper-util` to wrap Tokio types, or implement the traits yourself,
if it's a custom type.
([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2))
Expand Down Expand Up @@ -1600,7 +1618,7 @@ Be sure to check out the [upgrading guide](https://hyper.rs/guides/1/upgrading).

* **client:**
* check for dead connections in Pool ([44af2738](https://github.com/hyperium/hyper/commit/44af273853f82b81591b813d13627e143a14a6b7), closes [#1429](https://github.com/hyperium/hyper/issues/1429))
* error on unsupport 101 responses, ignore other 1xx codes ([22774222](https://github.com/hyperium/hyper/commit/227742221fa7830a14c18becbbc6137d97b57729))
* error on unsupported 101 responses, ignore other 1xx codes ([22774222](https://github.com/hyperium/hyper/commit/227742221fa7830a14c18becbbc6137d97b57729))
* **server:**
* send 400 responses on parse errors before closing connection ([7cb72d20](https://github.com/hyperium/hyper/commit/7cb72d2019bffbc667b9ad2d8cbc19c1a513fcf7))
* error if Response code is 1xx ([44c34ce9](https://github.com/hyperium/hyper/commit/44c34ce9adc888916bd67656cc54c35f7908f536))
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hyper"
version = "1.4.1"
version = "1.5.0"
description = "A fast and correct HTTP library."
readme = "README.md"
homepage = "https://hyper.rs"
Expand Down
2 changes: 1 addition & 1 deletion benches/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn hello_world_16(b: &mut test::Bencher) {
tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
.unwrap();
let mut buf = Vec::new();
tcp.read_to_end(&mut buf).unwrap()
tcp.read_to_end(&mut buf).unwrap() - "connection: close\r\n".len()
} * PIPELINED_REQUESTS;

let mut tcp = TcpStream::connect(addr).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ macro_rules! bench_server {
tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
.unwrap();
let mut buf = Vec::new();
tcp.read_to_end(&mut buf).unwrap()
tcp.read_to_end(&mut buf).unwrap() - "connection: close\r\n".len()
};

let mut tcp = TcpStream::connect(addr).unwrap();
Expand Down
81 changes: 81 additions & 0 deletions examples/hello-http2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#![deny(warnings)]

use std::convert::Infallible;
use std::net::SocketAddr;

use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http2;
use hyper::service::service_fn;
use hyper::{Request, Response};
use tokio::net::TcpListener;

// This would normally come from the `hyper-util` crate, but we can't depend
// on that here because it would be a cyclical dependency.
#[path = "../benches/support/mod.rs"]
mod support;
use support::TokioIo;

// An async function that consumes a request, does nothing with it and returns a
// response.
async fn hello(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
}

#[derive(Clone)]
// An Executor that uses the tokio runtime.
pub struct TokioExecutor;

// Implement the `hyper::rt::Executor` trait for `TokioExecutor` so that it can be used to spawn
// tasks in the hyper runtime.
// An Executor allows us to manage execution of tasks which can help us improve the efficiency and
// scalability of the server.
impl<F> hyper::rt::Executor<F> for TokioExecutor
where
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn(fut);
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
pretty_env_logger::init();

// This address is localhost
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

// Bind to the port and listen for incoming TCP connections
let listener = TcpListener::bind(addr).await?;

loop {
// When an incoming TCP connection is received grab a TCP stream for
// client-server communication.
//
// Note, this is a .await point, this loop will loop forever but is not a busy loop. The
// .await point allows the Tokio runtime to pull the task off of the thread until the task
// has work to do. In this case, a connection arrives on the port we are listening on and
// the task is woken up, at which point the task is then put back on a thread, and is
// driven forward by the runtime, eventually yielding a TCP stream.
let (stream, _) = listener.accept().await?;
// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
let io = TokioIo::new(stream);

// Spin up a new task in Tokio so we can continue to listen for new TCP connection on the
// current task without waiting for the processing of the HTTP/2 connection we just received
// to finish
tokio::task::spawn(async move {
// Handle the connection from the client using HTTP/2 with an executor and pass any
// HTTP requests received on that connection to the `hello` function
if let Err(err) = http2::Builder::new(TokioExecutor)
.serve_connection(io, service_fn(hello))
.await
{
eprintln!("Error serving connection: {}", err);
}
});
}
}
2 changes: 2 additions & 0 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub struct Parts<T> {
///
/// In most cases, this should just be spawned into an executor, so that it
/// can process incoming and outgoing messages, notice hangups, and the like.
///
/// Instances of this type are typically created via the [`handshake`] function
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B>
where
Expand Down
50 changes: 44 additions & 6 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ impl<B> Clone for SendRequest<B> {
///
/// In most cases, this should just be spawned into an executor, so that it
/// can process incoming and outgoing messages, notice hangups, and the like.
///
/// Instances of this type are typically created via the [`handshake`] function
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B, E>
where
Expand Down Expand Up @@ -337,13 +339,9 @@ where

/// Sets the maximum frame size to use for HTTP2.
///
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
/// Default is currently 16KB, but can change.
pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.max_frame_size = sz;
}
self.h2_builder.max_frame_size = sz.into();
self
}

Expand All @@ -355,6 +353,46 @@ where
self
}

/// Sets the header table size.
///
/// This setting informs the peer of the maximum size of the header compression
/// table used to encode header blocks, in octets. The encoder may select any value
/// equal to or less than the header table size specified by the sender.
///
/// The default value of crate `h2` is 4,096.
pub fn header_table_size(&mut self, size: impl Into<Option<u32>>) -> &mut Self {
self.h2_builder.header_table_size = size.into();
self
}

/// Sets the maximum number of concurrent streams.
///
/// The maximum concurrent streams setting only controls the maximum number
/// of streams that can be initiated by the remote peer. In other words,
/// when this setting is set to 100, this does not limit the number of
/// concurrent streams that can be created by the caller.
///
/// It is recommended that this value be no smaller than 100, so as to not
/// unnecessarily limit parallelism. However, any value is legal, including
/// 0. If `max` is set to 0, then the remote will not be permitted to
/// initiate streams.
///
/// Note that streams in the reserved state, i.e., push promises that have
/// been reserved but the stream has not started, do not count against this
/// setting.
///
/// Also note that if the remote *does* exceed the value set here, it is not
/// a protocol level error. Instead, the `h2` library will immediately reset
/// the stream.
///
/// See [Section 5.1.2] in the HTTP/2 spec for more details.
///
/// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
self.h2_builder.max_concurrent_streams = max.into();
self
}

/// Sets an interval for HTTP2 Ping frames should be sent to keep a
/// connection alive.
///
Expand Down
2 changes: 1 addition & 1 deletion src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;

/// An error when calling `try_send_request`.
///
/// There is a possibility of an error occuring on a connection in-between the
/// There is a possibility of an error occurring on a connection in-between the
/// time that a request is queued and when it is actually written to the IO
/// transport. If that happens, it is safe to return the request back to the
/// caller, as it was never fully sent.
Expand Down
6 changes: 3 additions & 3 deletions src/ext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ impl OriginalHeaderOrder {
self.entry_order.push((name, idx));
}

// No doc test is possible here because (a) `RUSTDOCFLAGS='--cfg hyper_unstable_ffi'`
// is needed to enable this feature and (b) because this is a private interface and doctests
// can only see public symbols.
// No doc test is run here because `RUSTFLAGS='--cfg hyper_unstable_ffi'`
// is needed to compile. Once ffi is stabilized `no_run` should be removed
// here.
/// This returns an iterator that provides header names and indexes
/// in the original order received.
///
Expand Down
25 changes: 17 additions & 8 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext
use crate::body::DecodedLength;
#[cfg(feature = "server")]
use crate::common::time::Time;
use crate::headers::connection_keep_alive;
use crate::headers;
use crate::proto::{BodyLength, MessageHead};
#[cfg(feature = "server")]
use crate::rt::Sleep;
Expand Down Expand Up @@ -657,7 +657,7 @@ where
let outgoing_is_keep_alive = head
.headers
.get(CONNECTION)
.map_or(false, connection_keep_alive);
.map_or(false, headers::connection_keep_alive);

if !outgoing_is_keep_alive {
match head.version {
Expand All @@ -680,12 +680,21 @@ where
// If we know the remote speaks an older version, we try to fix up any messages
// to work with our older peer.
fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
if let Version::HTTP_10 = self.state.version {
// Fixes response or connection when keep-alive header is not present
self.fix_keep_alive(head);
// If the remote only knows HTTP/1.0, we should force ourselves
// to do only speak HTTP/1.0 as well.
head.version = Version::HTTP_10;
match self.state.version {
Version::HTTP_10 => {
// Fixes response or connection when keep-alive header is not present
self.fix_keep_alive(head);
// If the remote only knows HTTP/1.0, we should force ourselves
// to do only speak HTTP/1.0 as well.
head.version = Version::HTTP_10;
}
Version::HTTP_11 => {
if let KA::Disabled = self.state.keep_alive.status() {
head.headers
.insert(CONNECTION, HeaderValue::from_static("close"));
}
}
_ => (),
}
// If the remote speaks HTTP/1.1, then it *should* be fine with
// both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
Expand Down
10 changes: 9 additions & 1 deletion src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const MAX_BUF_LIST_BUFFERS: usize = 16;
pub(crate) struct Buffered<T, B> {
flush_pipeline: bool,
io: T,
partial_len: Option<usize>,
read_blocked: bool,
read_buf: BytesMut,
read_buf_strategy: ReadStrategy,
Expand Down Expand Up @@ -65,6 +66,7 @@ where
Buffered {
flush_pipeline: false,
io,
partial_len: None,
read_blocked: false,
read_buf: BytesMut::with_capacity(0),
read_buf_strategy: ReadStrategy::default(),
Expand Down Expand Up @@ -176,6 +178,7 @@ where
loop {
match super::role::parse_headers::<S>(
&mut self.read_buf,
self.partial_len,
ParseContext {
cached_headers: parse_ctx.cached_headers,
req_method: parse_ctx.req_method,
Expand All @@ -191,14 +194,19 @@ where
)? {
Some(msg) => {
debug!("parsed {} headers", msg.head.headers.len());
self.partial_len = None;
return Poll::Ready(Ok(msg));
}
None => {
let max = self.read_buf_strategy.max();
if self.read_buf.len() >= max {
let curr_len = self.read_buf.len();
if curr_len >= max {
debug!("max_buf_size ({}) reached, closing", max);
return Poll::Ready(Err(crate::Error::new_too_large()));
}
if curr_len > 0 {
self.partial_len = Some(curr_len);
}
}
}
if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
Expand Down
Loading

0 comments on commit 89b23fc

Please sign in to comment.