diff --git a/linkerd/app/integration/src/client.rs b/linkerd/app/integration/src/client.rs index 146f7ba1e7..ed5c5763f3 100644 --- a/linkerd/app/integration/src/client.rs +++ b/linkerd/app/integration/src/client.rs @@ -1,5 +1,5 @@ use super::*; -use linkerd_app_core::proxy::http::trace; +use linkerd_app_core::proxy::http::TracingExecutor; use parking_lot::Mutex; use std::io; use tokio::net::TcpStream; @@ -252,7 +252,7 @@ fn run( let work = async move { let client = hyper::Client::builder() .http2_only(http2_only) - .executor(trace::Executor::new()) + .executor(TracingExecutor) .build::(conn); tracing::trace!("client task started"); let mut rx = rx; diff --git a/linkerd/app/integration/src/controller.rs b/linkerd/app/integration/src/controller.rs index 831f8541c1..590c15166a 100644 --- a/linkerd/app/integration/src/controller.rs +++ b/linkerd/app/integration/src/controller.rs @@ -2,7 +2,7 @@ use super::*; pub use linkerd2_proxy_api::destination as pb; use linkerd2_proxy_api::net; -use linkerd_app_core::proxy::http::trace; +use linkerd_app_core::proxy::http::TracingExecutor; use parking_lot::Mutex; use std::collections::VecDeque; use std::net::IpAddr; @@ -372,7 +372,7 @@ where let _ = listening_tx.send(()); } - let mut http = hyper::server::conn::Http::new().with_executor(trace::Executor::new()); + let mut http = hyper::server::conn::Http::new().with_executor(TracingExecutor); http.http2_only(true); loop { let (sock, addr) = listener.accept().await?; diff --git a/linkerd/app/integration/src/server.rs b/linkerd/app/integration/src/server.rs index 70017727ab..cca6fae2c1 100644 --- a/linkerd/app/integration/src/server.rs +++ b/linkerd/app/integration/src/server.rs @@ -1,5 +1,5 @@ +use super::app_core::svc::http::TracingExecutor; use super::*; -use linkerd_app_core::proxy::http::trace; use std::{ io, sync::atomic::{AtomicUsize, Ordering}, @@ -194,8 +194,7 @@ impl Server { async move { tracing::info!("support server running"); let mut new_svc = NewSvc(Arc::new(self.routes)); - let mut http = - hyper::server::conn::Http::new().with_executor(trace::Executor::new()); + let mut http = hyper::server::conn::Http::new().with_executor(TracingExecutor); match self.version { Run::Http1 => http.http1_only(true), Run::Http2 => http.http2_only(true), diff --git a/linkerd/proxy/http/src/trace.rs b/linkerd/proxy/http/src/executor.rs similarity index 65% rename from linkerd/proxy/http/src/trace.rs rename to linkerd/proxy/http/src/executor.rs index 82595349c8..b97fc4e65c 100644 --- a/linkerd/proxy/http/src/trace.rs +++ b/linkerd/proxy/http/src/executor.rs @@ -2,15 +2,9 @@ use std::future::Future; use tracing::instrument::Instrument; #[derive(Clone, Debug, Default)] -pub struct Executor(()); +pub struct TracingExecutor; -impl Executor { - pub fn new() -> Self { - Self(()) - } -} - -impl hyper::rt::Executor for Executor +impl hyper::rt::Executor for TracingExecutor where F: Future + Send + 'static, F::Output: Send + 'static, diff --git a/linkerd/proxy/http/src/h2.rs b/linkerd/proxy/http/src/h2.rs index 778c37609a..2efae686ca 100644 --- a/linkerd/proxy/http/src/h2.rs +++ b/linkerd/proxy/http/src/h2.rs @@ -1,4 +1,4 @@ -use crate::trace; +use crate::executor::TracingExecutor; use futures::prelude::*; pub use h2::{Error as H2Error, Reason}; use hyper::{ @@ -95,10 +95,10 @@ where let (io, _meta) = connect.err_into::().await?; let mut builder = conn::Builder::new(); builder + .executor(TracingExecutor) .http2_only(true) .http2_initial_stream_window_size(initial_stream_window_size) - .http2_initial_connection_window_size(initial_connection_window_size) - .executor(trace::Executor::new()); + .http2_initial_connection_window_size(initial_connection_window_size); // Configure HTTP/2 PING frames if let Some(timeout) = keepalive_timeout { diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index 845020fb2a..fc36898782 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -9,6 +9,7 @@ pub mod classify; pub mod client; pub mod client_handle; pub mod detect; +mod executor; mod glue; pub mod h1; pub mod h2; @@ -21,7 +22,6 @@ mod retain; mod server; pub mod strip_header; pub mod timeout; -pub mod trace; pub mod upgrade; pub mod version; @@ -33,6 +33,7 @@ pub use self::{ }, client_handle::{ClientHandle, SetClientHandle}, detect::DetectHttp, + executor::TracingExecutor, header_from_target::NewHeaderFromTarget, normalize_uri::{MarkAbsoluteForm, NewNormalizeUri}, override_authority::{AuthorityOverride, NewOverrideAuthority}, diff --git a/linkerd/proxy/http/src/server.rs b/linkerd/proxy/http/src/server.rs index 225ac35fbc..7ffd2756c0 100644 --- a/linkerd/proxy/http/src/server.rs +++ b/linkerd/proxy/http/src/server.rs @@ -1,6 +1,6 @@ use crate::{ - client_handle::SetClientHandle, h2::Settings as H2Settings, trace, upgrade, BoxBody, - BoxRequest, ClientHandle, Version, + client_handle::SetClientHandle, h2::Settings as H2Settings, upgrade, BoxBody, BoxRequest, + ClientHandle, TracingExecutor, Version, }; use linkerd_error::Error; use linkerd_io::{self as io, PeerAddr}; @@ -13,8 +13,6 @@ use std::{ use tower::Service; use tracing::{debug, Instrument}; -type Server = hyper::server::conn::Http; - /// Configures HTTP server behavior. #[derive(Clone, Debug)] pub struct Params { @@ -34,7 +32,7 @@ pub struct NewServeHttp { #[derive(Clone, Debug)] pub struct ServeHttp { version: Version, - server: Server, + server: hyper::server::conn::Http, inner: N, drain: drain::Watch, } @@ -62,7 +60,7 @@ where fn new_service(&self, target: T) -> Self::Service { let Params { version, h2, drain } = self.params.extract_param(&target); - let mut srv = hyper::server::conn::Http::new().with_executor(trace::Executor::new()); + let mut srv = hyper::server::conn::Http::new().with_executor(TracingExecutor); srv.http2_initial_stream_window_size(h2.initial_stream_window_size) .http2_initial_connection_window_size(h2.initial_connection_window_size); // Configure HTTP/2 PING frames diff --git a/linkerd/proxy/tap/src/accept.rs b/linkerd/proxy/tap/src/accept.rs index fb3e941722..57f9411958 100644 --- a/linkerd/proxy/tap/src/accept.rs +++ b/linkerd/proxy/tap/src/accept.rs @@ -5,7 +5,7 @@ use linkerd_conditional::Conditional; use linkerd_error::Error; use linkerd_io as io; use linkerd_meshtls as meshtls; -use linkerd_proxy_http::trace; +use linkerd_proxy_http::TracingExecutor; use linkerd_tls as tls; use std::{ collections::HashSet, @@ -46,7 +46,7 @@ impl AcceptPermittedClients { let svc = TapServer::new(tap); Box::pin(async move { hyper::server::conn::Http::new() - .with_executor(trace::Executor::new()) + .with_executor(TracingExecutor) .http2_only(true) .serve_connection(io, svc) .await