Skip to content

Commit b41108e

Browse files
committed
wip: upgrade proxy/http
1 parent 534aa3a commit b41108e

File tree

8 files changed

+60
-48
lines changed

8 files changed

+60
-48
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2278,6 +2278,7 @@ dependencies = [
22782278
"httparse",
22792279
"hyper",
22802280
"hyper-balance",
2281+
"hyper-util",
22812282
"linkerd-detect",
22822283
"linkerd-duplex",
22832284
"linkerd-error",

linkerd/http/box/src/body.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl BoxBody {
3232
where
3333
B: Body + Send + 'static,
3434
B::Data: Send + 'static,
35-
B::Error: Into<Error>,
35+
B::Error: std::error::Error + Send + Sync + 'static,
3636
{
3737
Self {
3838
inner: Box::pin(Inner(inner)),
@@ -108,7 +108,7 @@ impl<B> Body for Inner<B>
108108
where
109109
B: Body,
110110
B::Data: Send + 'static,
111-
B::Error: Into<Error>,
111+
B::Error: std::error::Error + Send + Sync + 'static,
112112
{
113113
type Data = Data;
114114
type Error = Error;
@@ -140,7 +140,7 @@ impl<B> Inner<B>
140140
where
141141
B: Body,
142142
B::Data: Send + 'static,
143-
B::Error: Into<Error>,
143+
B::Error: std::error::Error + Send + Sync + 'static,
144144
{
145145
fn map_frame(frame: Result<Frame<B::Data>, B::Error>) -> Result<Frame<Data>, Error> {
146146
match frame {

linkerd/proxy/http/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ hyper = { workspace = true, features = [
2626
"http2",
2727
"server",
2828
] }
29+
hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "service"] }
2930
hyper-balance = { path = "../../../hyper-balance" }
3031
parking_lot = "0.12"
3132
pin-project = "1"

linkerd/proxy/http/src/client.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ where
6060
T: Clone + Send + Sync + 'static,
6161
X: ExtractParam<Params, T>,
6262
C: MakeConnection<(crate::Version, T)> + Clone + Unpin + Send + Sync + 'static,
63-
C::Connection: Unpin + Send,
63+
C::Connection: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
6464
C::Metadata: Send,
6565
C::Future: Unpin + Send + 'static,
66-
B: crate::Body + Send + 'static,
66+
B: crate::Body + Unpin + Send + 'static,
6767
B::Data: Send,
6868
B::Error: Into<Error> + Send + Sync,
6969
{
@@ -120,12 +120,12 @@ impl<C, T, B> Service<http::Request<B>> for Client<C, T, B>
120120
where
121121
T: Clone + Send + Sync + 'static,
122122
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
123-
C::Connection: Unpin + Send,
123+
C::Connection: hyper::rt::Read + hyper::rt::Write + Unpin + Send,
124124
C::Future: Unpin + Send + 'static,
125125
C::Error: Into<Error>,
126-
B: crate::Body + Send + 'static,
126+
B: crate::Body + Unpin + Send + 'static,
127127
B::Data: Send,
128-
B::Error: Into<Error> + Send + Sync,
128+
B::Error: std::error::Error + Send + Sync + 'static,
129129
{
130130
type Response = http::Response<BoxBody>;
131131
type Error = Error;

linkerd/proxy/http/src/h1.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ pub struct PoolSettings {
3333
pub struct Client<C, T, B> {
3434
connect: C,
3535
target: T,
36-
absolute_form: Option<hyper::Client<HyperConnect<C, T>, B>>,
37-
origin_form: Option<hyper::Client<HyperConnect<C, T>, B>>,
36+
absolute_form: Option<hyper_util::client::legacy::Client<HyperConnect<C, T>, B>>,
37+
origin_form: Option<hyper_util::client::legacy::Client<HyperConnect<C, T>, B>>,
3838
pool: PoolSettings,
3939
}
4040

@@ -68,9 +68,9 @@ impl<C, T, B> Client<C, T, B>
6868
where
6969
T: Clone + Send + Sync + 'static,
7070
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
71-
C::Connection: Unpin + Send,
71+
C::Connection: hyper::rt::Read + hyper::rt::Write + Unpin + Send,
7272
C::Future: Unpin + Send + 'static,
73-
B: crate::Body + Send + 'static,
73+
B: crate::Body + Unpin + Send + 'static,
7474
B::Data: Send,
7575
B::Error: Into<Error> + Send + Sync,
7676
{
@@ -94,10 +94,9 @@ where
9494
// ish, so we just build a one-off client for the connection.
9595
// There's no real reason to hold the client for re-use.
9696
debug!(use_absolute_form, is_missing_host, "Using one-off client");
97-
hyper::Client::builder()
97+
hyper_util::client::legacy::Client::builder(TracingExecutor)
9898
.pool_max_idle_per_host(0)
9999
.set_host(use_absolute_form)
100-
.executor(TracingExecutor)
101100
.build(HyperConnect::new(
102101
self.connect.clone(),
103102
self.target.clone(),
@@ -120,11 +119,10 @@ where
120119
if client.is_none() {
121120
debug!(use_absolute_form, "Caching new client");
122121
*client = Some(
123-
hyper::Client::builder()
122+
hyper_util::client::legacy::Client::builder(TracingExecutor)
124123
.pool_max_idle_per_host(self.pool.max_idle)
125124
.pool_idle_timeout(self.pool.idle_timeout)
126125
.set_host(use_absolute_form)
127-
.executor(TracingExecutor)
128126
.build(HyperConnect::new(
129127
self.connect.clone(),
130128
self.target.clone(),

linkerd/proxy/http/src/h2.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{Body, TracingExecutor};
22
use futures::prelude::*;
33
use linkerd_error::{Error, Result};
4+
use linkerd_http_box::BoxBody;
45
use linkerd_stack::{MakeConnection, Service};
56
use std::{
67
marker::PhantomData,
@@ -52,10 +53,10 @@ type ConnectFuture<B> = Pin<Box<dyn Future<Output = Result<Connection<B>>> + Sen
5253
impl<C, B, T> Service<T> for Connect<C, B>
5354
where
5455
C: MakeConnection<(crate::Version, T)>,
55-
C::Connection: Send + Unpin + 'static,
56+
C::Connection: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
5657
C::Metadata: Send,
5758
C::Future: Send + 'static,
58-
B: Body + Send + 'static,
59+
B: Body + Unpin + Send + 'static,
5960
B::Data: Send,
6061
B::Error: Into<Error> + Send + Sync,
6162
{
@@ -147,7 +148,7 @@ where
147148
B::Data: Send,
148149
B::Error: Into<Error> + Send + Sync,
149150
{
150-
type Response = http::Response<hyper::Body>;
151+
type Response = http::Response<BoxBody>;
151152
type Error = hyper::Error;
152153
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Response, Self::Error>>>>;
153154

@@ -171,6 +172,9 @@ where
171172
*req.version_mut() = http::Version::HTTP_11;
172173
}
173174

174-
self.tx.send_request(req).boxed()
175+
self.tx
176+
.send_request(req)
177+
.map_ok(|rsp| rsp.map(BoxBody::new))
178+
.boxed()
175179
}
176180
}

linkerd/proxy/http/src/orig_proto.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::{h1, h2, Body};
22
use futures::prelude::*;
33
use http::header::{HeaderValue, TRANSFER_ENCODING};
4+
use http_body::Frame;
45
use linkerd_error::{Error, Result};
56
use linkerd_http_box::BoxBody;
67
use linkerd_stack::{layer, MakeConnection, Service};
@@ -54,11 +55,11 @@ impl<C, T, B> Service<http::Request<B>> for Upgrade<C, T, B>
5455
where
5556
T: Clone + Send + Sync + 'static,
5657
C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static,
57-
C::Connection: Unpin + Send,
58+
C::Connection: hyper::rt::Read + hyper::rt::Write + Unpin + Send,
5859
C::Future: Unpin + Send + 'static,
59-
B: crate::Body + Send + 'static,
60+
B: crate::Body + Unpin + Send + 'static,
6061
B::Data: Send,
61-
B::Error: Into<Error> + Send + Sync,
62+
B::Error: std::error::Error + Send + Sync + 'static,
6263
{
6364
type Response = http::Response<BoxBody>;
6465
type Error = Error;
@@ -125,7 +126,8 @@ where
125126
.unwrap_or(orig_version);
126127
trace!(?version, "Downgrading response");
127128
*rsp.version_mut() = version;
128-
rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner }))
129+
rsp.map(|inner| UpgradeResponseBody { inner })
130+
.map(BoxBody::new)
129131
}),
130132
)
131133
}
@@ -211,23 +213,13 @@ where
211213
self.inner.is_end_stream()
212214
}
213215

214-
fn poll_data(
215-
self: Pin<&mut Self>,
216-
cx: &mut Context<'_>,
217-
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
218-
self.project()
219-
.inner
220-
.poll_data(cx)
221-
.map_err(downgrade_h2_error)
222-
}
223-
224-
fn poll_trailers(
216+
fn poll_frame(
225217
self: Pin<&mut Self>,
226218
cx: &mut Context<'_>,
227-
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
219+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
228220
self.project()
229221
.inner
230-
.poll_trailers(cx)
222+
.poll_frame(cx)
231223
.map_err(downgrade_h2_error)
232224
}
233225

linkerd/proxy/http/src/server.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use linkerd_io::{self as io, PeerAddr};
66
use linkerd_stack::{layer, ExtractParam, NewService};
77
use std::{
88
future::Future,
9+
marker::PhantomData,
910
pin::Pin,
1011
task::{Context, Poll},
1112
};
@@ -32,12 +33,13 @@ pub struct NewServeHttp<X, N> {
3233

3334
/// Serves HTTP connections with an inner service.
3435
#[derive(Clone, Debug)]
35-
pub struct ServeHttp<N> {
36+
pub struct ServeHttp<N, ReqB = BoxBody> {
3637
version: Version,
3738
http1: hyper::server::conn::http1::Builder,
3839
http2: hyper::server::conn::http2::Builder<TracingExecutor>,
3940
inner: N,
4041
drain: drain::Watch,
42+
marker: PhantomData<ReqB>,
4143
}
4244

4345
// === impl NewServeHttp ===
@@ -118,21 +120,33 @@ where
118120
drain,
119121
http1: hyper::server::conn::http1::Builder::new(),
120122
http2,
123+
marker: PhantomData,
121124
}
122125
}
123126
}
124127

125128
// === impl ServeHttp ===
126129

127-
impl<I, N, S> Service<I> for ServeHttp<N>
130+
impl<I, N, S, ReqB> Service<I> for ServeHttp<N, ReqB>
128131
where
129-
I: io::AsyncRead + io::AsyncWrite + PeerAddr + Send + Unpin + 'static,
132+
I: hyper::rt::Read + hyper::rt::Write + PeerAddr + Send + Unpin + 'static,
130133
N: NewService<ClientHandle, Service = S> + Send + 'static,
131-
S: Service<http::Request<BoxBody>, Response = http::Response<BoxBody>, Error = Error>
134+
S: Service<
135+
http::Request<linkerd_http_upgrade::glue::UpgradeBody<hyper::body::Incoming>>,
136+
Response = http::Response<BoxBody>,
137+
Error = Error,
138+
> + Service<
139+
http::Request<hyper::body::Incoming>,
140+
Response = http::Response<BoxBody>,
141+
Error = Error,
142+
> + Clone
132143
+ Unpin
133144
+ Send
134145
+ 'static,
135-
S::Future: Send + 'static,
146+
<S as Service<
147+
http::Request<linkerd_http_upgrade::glue::UpgradeBody<hyper::body::Incoming>>,
148+
>>::Future: Send + 'static,
149+
<S as Service<http::Request<hyper::body::Incoming>>>::Future: Send + 'static,
136150
{
137151
type Response = ();
138152
type Error = Error;
@@ -157,16 +171,17 @@ where
157171

158172
Box::pin(
159173
async move {
174+
use hyper_util::service::TowerToHyperService;
160175
let (svc, closed) = res?;
161176
debug!(?version, "Handling as HTTP");
177+
162178
match version {
163179
Version::Http1 => {
164180
// Enable support for HTTP upgrades (CONNECT and websockets).
165-
let svc = linkerd_http_upgrade::upgrade::Service::new(
166-
BoxRequest::new(svc),
167-
drain.clone(),
168-
);
169-
let mut conn = http1.serve_connection(io, svc).with_upgrades();
181+
let svc = linkerd_http_upgrade::upgrade::Service::new(svc, drain.clone());
182+
let mut conn = http1
183+
.serve_connection(io, TowerToHyperService::new(svc))
184+
.with_upgrades();
170185

171186
tokio::select! {
172187
res = &mut conn => {
@@ -187,7 +202,8 @@ where
187202
}
188203

189204
Version::H2 => {
190-
let mut conn = http2.serve_connection(io, BoxRequest::new(svc));
205+
let svc = TowerToHyperService::new(svc);
206+
let mut conn = http2.serve_connection(io, svc);
191207

192208
tokio::select! {
193209
res = &mut conn => {

0 commit comments

Comments
 (0)