Skip to content

Commit daf57fe

Browse files
committed
chore(http/upgrade): upgrade to hyper 1.x
NOTE: there is a comment noting that the upgrade middleware does not expect to be cloneable. it is unfortunately, however, at odds with the new bounds expected of extensions. so, `Http11Upgrade` is now Clone'able, but a comment is left in place noting this weakened invariant. it's worth investigating how upgrades have changed since, in more detail, but for the current moment we are interested in being especially conservative about changing behavior, and focusing on api changes like `Body::poll_frame(..)`. Signed-off-by: katelyn martin <kate@buoyant.io>
1 parent f9a9629 commit daf57fe

File tree

4 files changed

+23
-26
lines changed

4 files changed

+23
-26
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1930,6 +1930,7 @@ dependencies = [
19301930
"http 1.2.0",
19311931
"http-body 1.0.1",
19321932
"hyper",
1933+
"hyper-util",
19331934
"linkerd-duplex",
19341935
"linkerd-error",
19351936
"linkerd-http-box",

linkerd/http/upgrade/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ futures = { version = "0.3", default-features = false }
1616
http = { workspace = true }
1717
http-body = { workspace = true }
1818
hyper = { workspace = true, default-features = false, features = ["client"] }
19+
hyper-util = { workspace = true, default-features = false, features = [
20+
"client",
21+
"client-legacy",
22+
] }
1923
pin-project = "1"
2024
tokio = { version = "1", default-features = false }
2125
tower = { version = "0.4", default-features = false }

linkerd/http/upgrade/src/glue.rs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::upgrade::Http11Upgrade;
22
use futures::{ready, TryFuture};
3-
use http_body::Body;
4-
use hyper::client::connect as hyper_connect;
3+
use http_body::{Body, Frame};
4+
use hyper_util::client::legacy::connect as hyper_connect;
55
use linkerd_error::{Error, Result};
66
use linkerd_http_box::BoxBody;
77
use linkerd_io::{self as io, AsyncRead, AsyncWrite};
@@ -63,38 +63,21 @@ where
6363
self.body.is_end_stream()
6464
}
6565

66-
fn poll_data(
66+
fn poll_frame(
6767
self: Pin<&mut Self>,
6868
cx: &mut Context<'_>,
69-
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
70-
// Poll the next chunk from the body.
69+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
70+
// Poll the next frame from the body.
7171
let this = self.project();
7272
let body = this.body;
73-
let data = ready!(body.poll_data(cx));
73+
let frame = ready!(body.poll_frame(cx));
7474

7575
// Log errors.
76-
if let Some(Err(e)) = &data {
76+
if let Some(Err(e)) = &frame {
7777
debug!("http body error: {}", e);
7878
}
7979

80-
Poll::Ready(data)
81-
}
82-
83-
fn poll_trailers(
84-
self: Pin<&mut Self>,
85-
cx: &mut Context<'_>,
86-
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
87-
// Poll the trailers from the body.
88-
let this = self.project();
89-
let body = this.body;
90-
let trailers = ready!(body.poll_trailers(cx));
91-
92-
// Log errors.
93-
if let Err(e) = &trailers {
94-
debug!("http trailers error: {}", e);
95-
}
96-
97-
Poll::Ready(trailers)
80+
Poll::Ready(frame)
9881
}
9982

10083
#[inline]

linkerd/http/upgrade/src/upgrade.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@ use try_lock::TryLock;
2222
/// inserted into the `Request::extensions()`. If the HTTP1 client service
2323
/// also detects an upgrade, the two `OnUpgrade` futures will be joined
2424
/// together with the glue in this type.
25+
//
2526
// Note: this relies on their only having been 2 Inner clones, so don't
2627
// implement `Clone` for this type.
28+
// XXX(kate): to satisfy new trait bounds when upgrading to hyper 1.x, this type must now be
29+
// Clone'able.
30+
#[derive(Clone)]
2731
pub struct Http11Upgrade {
2832
half: Half,
2933
inner: Arc<Inner>,
@@ -50,7 +54,9 @@ struct Inner {
5054
upgrade_drain_signal: Option<drain::Watch>,
5155
}
5256

53-
#[derive(Debug)]
57+
// XXX(kate): to satisfy new trait bounds when upgrading to hyper 1.x, this type must now be
58+
// Clone'able.
59+
#[derive(Clone, Debug)]
5460
enum Half {
5561
Server,
5662
Client,
@@ -139,6 +145,9 @@ impl Drop for Inner {
139145
let both_upgrades = async move {
140146
let (server_conn, client_conn) = tokio::try_join!(server_upgrade, client_upgrade)?;
141147
trace!("HTTP upgrade successful");
148+
use hyper_util::rt::TokioIo;
149+
let client_conn = TokioIo::new(client_conn);
150+
let server_conn = TokioIo::new(server_conn);
142151
if let Err(e) = Duplex::new(client_conn, server_conn).await {
143152
info!("tcp duplex error: {}", e)
144153
}

0 commit comments

Comments
 (0)